Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement delete operation #1324

Merged
merged 4 commits into from
Mar 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions mr/src/main/java/org/elasticsearch/hadoop/cfg/Settings.java
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ public String getSerializerValueWriterClassName() {
return getProperty(ES_SERIALIZATION_WRITER_VALUE_CLASS);
}


public String getSerializerBytesConverterClassName() {
return getProperty(ES_SERIALIZATION_WRITER_BYTES_CLASS);
}
Expand Down Expand Up @@ -765,3 +766,4 @@ public String save() {

public abstract Properties asProperties();
}

Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public static void filterNonDataNodesIfNeeded(Settings settings, Log log) {
}

RestClient bootstrap = new RestClient(settings);
try {
try {
String message = "No data nodes with HTTP-enabled available";
List<NodeInfo> dataNodes = bootstrap.getHttpDataNodes();
if (dataNodes.isEmpty()) {
Expand Down Expand Up @@ -253,10 +253,18 @@ public static void validateSettings(Settings settings) {
Assert.isTrue(settings.getMappingExcludes().isEmpty(), "When writing data as JSON, the field exclusion feature is ignored. This is most likely not what the user intended. Bailing out...");
}

//check the configuration is coherent in order to use the delete operation
if (ConfigurationOptions.ES_OPERATION_DELETE.equals(settings.getOperation())) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 thanks for adding this

Assert.isTrue(!settings.getInputAsJson(), "When using delete operation, providing data as JSON is not coherent because this operation does not need document as a payload. This is most likely not what the user intended. Bailing out...");
Assert.isTrue(settings.getMappingIncludes().isEmpty(), "When using delete operation, the field inclusion feature is ignored. This is most likely not what the user intended. Bailing out...");
Assert.isTrue(settings.getMappingExcludes().isEmpty(), "When using delete operation, the field exclusion feature is ignored. This is most likely not what the user intended. Bailing out...");
Assert.isTrue(settings.getMappingId() != null && !StringUtils.EMPTY.equals(settings.getMappingId()), "When using delete operation, the property " + ConfigurationOptions.ES_MAPPING_ID + " must be set and must not be empty since we need the document id in order to delete it. Bailing out...");
}

// Check to make sure user doesn't specify more than one script type
boolean hasScript = false;
String[] scripts = {settings.getUpdateScriptInline(), settings.getUpdateScriptFile(), settings.getUpdateScriptStored()};
for (String script: scripts) {
for (String script : scripts) {
boolean isSet = StringUtils.hasText(script);
Assert.isTrue((hasScript && isSet) == false, "Multiple scripts are specified. Please specify only one via [es.update.script.inline], [es.update.script.file], or [es.update.script.stored]");
hasScript = hasScript || isSet;
Expand Down Expand Up @@ -468,4 +476,4 @@ public static boolean setUserProviderIfNotSet(Settings settings, Class<? extends
}
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,10 @@
*/
package org.elasticsearch.hadoop.serialization.bulk;

import java.util.ArrayList;
import java.util.List;
import java.util.Date;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.hadoop.EsHadoopIllegalArgumentException;
import org.elasticsearch.hadoop.cfg.ConfigurationOptions;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.rest.Resource;
import org.elasticsearch.hadoop.serialization.builder.ValueWriter;
Expand All @@ -44,6 +41,10 @@
import org.elasticsearch.hadoop.util.ObjectUtils;
import org.elasticsearch.hadoop.util.StringUtils;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;

public abstract class AbstractBulkFactory implements BulkFactory {

private static Log log = LogFactory.getLog(AbstractBulkFactory.class);
Expand All @@ -60,13 +61,13 @@ public abstract class AbstractBulkFactory implements BulkFactory {
// used when specifying an index pattern
private IndexExtractor indexExtractor;
private FieldExtractor idExtractor,
typeExtractor,
parentExtractor,
routingExtractor,
versionExtractor,
ttlExtractor,
timestampExtractor,
paramsExtractor;
typeExtractor,
parentExtractor,
routingExtractor,
versionExtractor,
ttlExtractor,
timestampExtractor,
paramsExtractor;

private final FieldExtractor versionTypeExtractor = new FieldExtractor() {

Expand Down Expand Up @@ -139,14 +140,10 @@ void doWrite(Object value) {
}

pool.get().bytes(valueString);
}

else if (value instanceof Date) {
String valueString = (value == null ? "null": Long.toString(((Date) value).getTime()));
} else if (value instanceof Date) {
String valueString = (value == null ? "null" : Long.toString(((Date) value).getTime()));
pool.get().bytes(valueString);
}

else if (value instanceof RawJson) {
} else if (value instanceof RawJson) {
pool.get().bytes(((RawJson) value).json());
}
// library specific type - use the value writer (a bit overkill but handles collections/arrays properly)
Expand Down Expand Up @@ -249,25 +246,24 @@ private void initExtractorsFromSettings(final Settings settings) {
ttlExtractor = jsonExtractors.ttl();
timestampExtractor = jsonExtractors.timestamp();
paramsExtractor = jsonExtractors.params();
}
else {
} else {
// init extractors (if needed)
if (settings.getMappingId() != null) {
settings.setProperty(ConstantFieldExtractor.PROPERTY, settings.getMappingId());
idExtractor = ObjectUtils.<FieldExtractor> instantiate(settings.getMappingIdExtractorClassName(),
idExtractor = ObjectUtils.<FieldExtractor>instantiate(settings.getMappingIdExtractorClassName(),
settings);
}
if (settings.getMappingParent() != null) {
settings.setProperty(ConstantFieldExtractor.PROPERTY, settings.getMappingParent());
parentExtractor = ObjectUtils.<FieldExtractor> instantiate(
parentExtractor = ObjectUtils.<FieldExtractor>instantiate(
settings.getMappingParentExtractorClassName(), settings);
}
// Two different properties can satisfy the routing field extraction
ChainedFieldExtractor.NoValueHandler routingResponse = ChainedFieldExtractor.NoValueHandler.SKIP;
List<FieldExtractor> routings = new ArrayList<FieldExtractor>(2);
if (settings.getMappingRouting() != null) {
settings.setProperty(ConstantFieldExtractor.PROPERTY, settings.getMappingRouting());
FieldExtractor extractor = ObjectUtils.<FieldExtractor> instantiate(
FieldExtractor extractor = ObjectUtils.<FieldExtractor>instantiate(
settings.getMappingRoutingExtractorClassName(), settings);
// If we specify a routing field, return NOT_FOUND if we ultimately cannot find one instead of skipping
routingResponse = ChainedFieldExtractor.NoValueHandler.NOT_FOUND;
Expand All @@ -286,22 +282,22 @@ private void initExtractorsFromSettings(final Settings settings) {

if (settings.getMappingTtl() != null) {
settings.setProperty(ConstantFieldExtractor.PROPERTY, settings.getMappingTtl());
ttlExtractor = ObjectUtils.<FieldExtractor> instantiate(settings.getMappingTtlExtractorClassName(),
ttlExtractor = ObjectUtils.<FieldExtractor>instantiate(settings.getMappingTtlExtractorClassName(),
settings);
}
if (settings.getMappingVersion() != null) {
settings.setProperty(ConstantFieldExtractor.PROPERTY, settings.getMappingVersion());
versionExtractor = ObjectUtils.<FieldExtractor> instantiate(
versionExtractor = ObjectUtils.<FieldExtractor>instantiate(
settings.getMappingVersionExtractorClassName(), settings);
}
if (settings.getMappingTimestamp() != null) {
settings.setProperty(ConstantFieldExtractor.PROPERTY, settings.getMappingTimestamp());
timestampExtractor = ObjectUtils.<FieldExtractor> instantiate(
timestampExtractor = ObjectUtils.<FieldExtractor>instantiate(
settings.getMappingTimestampExtractorClassName(), settings);
}

// create adapter
IndexExtractor iformat = ObjectUtils.<IndexExtractor> instantiate(settings.getMappingIndexExtractorClassName(), settings);
IndexExtractor iformat = ObjectUtils.<IndexExtractor>instantiate(settings.getMappingIndexExtractorClassName(), settings);
iformat.compile(new Resource(settings, false).toString());

if (iformat.hasPattern()) {
Expand Down Expand Up @@ -371,14 +367,15 @@ public BulkCommand createBulk() {
if (!isStatic) {
before.add(new DynamicHeaderRef());
after.add(new DynamicEndRef());
}
else {
} else {
writeObjectHeader(before);
before = compact(before);
writeObjectEnd(after);
after = compact(after);
}

if (ConfigurationOptions.ES_OPERATION_DELETE.equals(getOperation())) {
return new DeleteTemplatedBulk(before, after);
}
boolean isScriptUpdate = settings.hasUpdateScript();
// compress pieces
if (jsonInput) {
Expand Down Expand Up @@ -523,15 +520,13 @@ private List<Object> compact(List<Object> list) {
stringAccumulator.setLength(0);
}
compacted.add(object);
}
else if (object instanceof FieldExtractor) {
} else if (object instanceof FieldExtractor) {
if (stringAccumulator.length() > 0) {
compacted.add(new BytesArray(stringAccumulator.toString()));
stringAccumulator.setLength(0);
}
compacted.add(new FieldWriter((FieldExtractor) object));
}
else {
} else {
stringAccumulator.append(object.toString());
}
}
Expand All @@ -546,3 +541,4 @@ protected FieldExtractor getParamExtractor() {
return paramsExtractor;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,11 @@ else if (ConfigurationOptions.ES_OPERATION_UPDATE.equals(operation)) {
else if (ConfigurationOptions.ES_OPERATION_UPSERT.equals(operation)) {
factory = new UpdateBulkFactory(settings, true, metaExtractor, version);
}
else if (ConfigurationOptions.ES_OPERATION_DELETE.equals(operation)) {
factory = new DeleteBulkFactory(settings, metaExtractor, version);
}
else {
throw new EsHadoopIllegalArgumentException("Unknown operation " + operation);
throw new EsHadoopIllegalArgumentException("Unsupported bulk operation " + operation);
}

return factory.createBulk();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.hadoop.serialization.bulk;

import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.UTF8;
import org.apache.hadoop.io.VIntWritable;
import org.apache.hadoop.io.VLongWritable;
import org.elasticsearch.hadoop.cfg.ConfigurationOptions;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.serialization.Generator;
import org.elasticsearch.hadoop.serialization.builder.ValueWriter;
import org.elasticsearch.hadoop.util.EsMajorVersion;
import org.elasticsearch.hadoop.util.StringUtils;

import java.util.List;

public class DeleteBulkFactory extends AbstractBulkFactory {


public DeleteBulkFactory(Settings settings, MetadataExtractor metaExtractor, EsMajorVersion version) {
super(settings, metaExtractor, version);
}

@Override
protected String getOperation() {
return ConfigurationOptions.ES_OPERATION_DELETE;
}

@Override
protected void writeObjectEnd(List<Object> list) {
// skip adding new-line for each entity as delete doesn't need entity output
list.add(StringUtils.EMPTY);
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.hadoop.serialization.bulk;

import org.elasticsearch.hadoop.util.BytesRef;

import java.util.Collection;

public class DeleteTemplatedBulk extends TemplatedBulk {

DeleteTemplatedBulk(Collection<Object> beforeObject, Collection<Object> afterObject) {
super(beforeObject, afterObject, null);
}

@Override
public BytesRef write(Object object) {
ref.reset();
scratchPad.reset();
Object processed = preProcess(object, scratchPad);
writeTemplate(beforeObject, processed);
writeTemplate(afterObject, processed);
return ref;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@

class TemplatedBulk implements BulkCommand {

private final Collection<Object> beforeObject;
private final Collection<Object> afterObject;
protected final Collection<Object> beforeObject;
protected final Collection<Object> afterObject;

private BytesArray scratchPad = new BytesArray(1024);
private BytesRef ref = new BytesRef();
protected BytesArray scratchPad = new BytesArray(1024);
protected BytesRef ref = new BytesRef();

private final ValueWriter valueWriter;

Expand Down Expand Up @@ -71,7 +71,7 @@ protected void doWriteObject(Object object, BytesArray storage, ValueWriter<?> w
ContentBuilder.generate(bos, writer).value(object).flush().close();
}

private void writeTemplate(Collection<Object> template, Object object) {
protected void writeTemplate(Collection<Object> template, Object object) {
for (Object item : template) {
if (item instanceof BytesArray) {
ref.add((BytesArray) item);
Expand All @@ -89,4 +89,4 @@ else if (item instanceof DynamicContentRef) {
}
}
}
}
}
Loading