Skip to content

Commit

Permalink
implement a new BulkCommand to support delete operation
Browse files Browse the repository at this point in the history
  • Loading branch information
vincentarnaud90 committed Oct 11, 2019
1 parent e9a7455 commit db71a2d
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 241 deletions.
5 changes: 1 addition & 4 deletions mr/src/main/java/org/elasticsearch/hadoop/cfg/Settings.java
Original file line number Diff line number Diff line change
Expand Up @@ -221,10 +221,6 @@ public String getSerializerValueWriterClassName() {
return getProperty(ES_SERIALIZATION_WRITER_VALUE_CLASS);
}

public Settings setSerializerValueWriterClassName(String className) {
setProperty(ES_SERIALIZATION_WRITER_VALUE_CLASS, className);
return this;
}

public String getSerializerBytesConverterClassName() {
return getProperty(ES_SERIALIZATION_WRITER_BYTES_CLASS);
Expand Down Expand Up @@ -770,3 +766,4 @@ public String save() {

public abstract Properties asProperties();
}

Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,10 @@
*/
package org.elasticsearch.hadoop.serialization.bulk;

import java.util.ArrayList;
import java.util.List;
import java.util.Date;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.hadoop.EsHadoopIllegalArgumentException;
import org.elasticsearch.hadoop.cfg.ConfigurationOptions;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.rest.Resource;
import org.elasticsearch.hadoop.serialization.builder.ValueWriter;
Expand All @@ -44,6 +41,10 @@
import org.elasticsearch.hadoop.util.ObjectUtils;
import org.elasticsearch.hadoop.util.StringUtils;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;

public abstract class AbstractBulkFactory implements BulkFactory {

private static Log log = LogFactory.getLog(AbstractBulkFactory.class);
Expand All @@ -60,13 +61,13 @@ public abstract class AbstractBulkFactory implements BulkFactory {
// used when specifying an index pattern
private IndexExtractor indexExtractor;
private FieldExtractor idExtractor,
typeExtractor,
parentExtractor,
routingExtractor,
versionExtractor,
ttlExtractor,
timestampExtractor,
paramsExtractor;
typeExtractor,
parentExtractor,
routingExtractor,
versionExtractor,
ttlExtractor,
timestampExtractor,
paramsExtractor;

private final FieldExtractor versionTypeExtractor = new FieldExtractor() {

Expand Down Expand Up @@ -139,14 +140,10 @@ void doWrite(Object value) {
}

pool.get().bytes(valueString);
}

else if (value instanceof Date) {
String valueString = (value == null ? "null": Long.toString(((Date) value).getTime()));
} else if (value instanceof Date) {
String valueString = (value == null ? "null" : Long.toString(((Date) value).getTime()));
pool.get().bytes(valueString);
}

else if (value instanceof RawJson) {
} else if (value instanceof RawJson) {
pool.get().bytes(((RawJson) value).json());
}
// library specific type - use the value writer (a bit overkill but handles collections/arrays properly)
Expand Down Expand Up @@ -249,25 +246,24 @@ private void initExtractorsFromSettings(final Settings settings) {
ttlExtractor = jsonExtractors.ttl();
timestampExtractor = jsonExtractors.timestamp();
paramsExtractor = jsonExtractors.params();
}
else {
} else {
// init extractors (if needed)
if (settings.getMappingId() != null) {
settings.setProperty(ConstantFieldExtractor.PROPERTY, settings.getMappingId());
idExtractor = ObjectUtils.<FieldExtractor> instantiate(settings.getMappingIdExtractorClassName(),
idExtractor = ObjectUtils.<FieldExtractor>instantiate(settings.getMappingIdExtractorClassName(),
settings);
}
if (settings.getMappingParent() != null) {
settings.setProperty(ConstantFieldExtractor.PROPERTY, settings.getMappingParent());
parentExtractor = ObjectUtils.<FieldExtractor> instantiate(
parentExtractor = ObjectUtils.<FieldExtractor>instantiate(
settings.getMappingParentExtractorClassName(), settings);
}
// Two different properties can satisfy the routing field extraction
ChainedFieldExtractor.NoValueHandler routingResponse = ChainedFieldExtractor.NoValueHandler.SKIP;
List<FieldExtractor> routings = new ArrayList<FieldExtractor>(2);
if (settings.getMappingRouting() != null) {
settings.setProperty(ConstantFieldExtractor.PROPERTY, settings.getMappingRouting());
FieldExtractor extractor = ObjectUtils.<FieldExtractor> instantiate(
FieldExtractor extractor = ObjectUtils.<FieldExtractor>instantiate(
settings.getMappingRoutingExtractorClassName(), settings);
// If we specify a routing field, return NOT_FOUND if we ultimately cannot find one instead of skipping
routingResponse = ChainedFieldExtractor.NoValueHandler.NOT_FOUND;
Expand All @@ -286,22 +282,22 @@ private void initExtractorsFromSettings(final Settings settings) {

if (settings.getMappingTtl() != null) {
settings.setProperty(ConstantFieldExtractor.PROPERTY, settings.getMappingTtl());
ttlExtractor = ObjectUtils.<FieldExtractor> instantiate(settings.getMappingTtlExtractorClassName(),
ttlExtractor = ObjectUtils.<FieldExtractor>instantiate(settings.getMappingTtlExtractorClassName(),
settings);
}
if (settings.getMappingVersion() != null) {
settings.setProperty(ConstantFieldExtractor.PROPERTY, settings.getMappingVersion());
versionExtractor = ObjectUtils.<FieldExtractor> instantiate(
versionExtractor = ObjectUtils.<FieldExtractor>instantiate(
settings.getMappingVersionExtractorClassName(), settings);
}
if (settings.getMappingTimestamp() != null) {
settings.setProperty(ConstantFieldExtractor.PROPERTY, settings.getMappingTimestamp());
timestampExtractor = ObjectUtils.<FieldExtractor> instantiate(
timestampExtractor = ObjectUtils.<FieldExtractor>instantiate(
settings.getMappingTimestampExtractorClassName(), settings);
}

// create adapter
IndexExtractor iformat = ObjectUtils.<IndexExtractor> instantiate(settings.getMappingIndexExtractorClassName(), settings);
IndexExtractor iformat = ObjectUtils.<IndexExtractor>instantiate(settings.getMappingIndexExtractorClassName(), settings);
iformat.compile(new Resource(settings, false).toString());

if (iformat.hasPattern()) {
Expand Down Expand Up @@ -371,14 +367,15 @@ public BulkCommand createBulk() {
if (!isStatic) {
before.add(new DynamicHeaderRef());
after.add(new DynamicEndRef());
}
else {
} else {
writeObjectHeader(before);
before = compact(before);
writeObjectEnd(after);
after = compact(after);
}

if (ConfigurationOptions.ES_OPERATION_DELETE.equals(getOperation())) {
return new DeleteTemplatedBulk(before, after);
}
boolean isScriptUpdate = settings.hasUpdateScript();
// compress pieces
if (jsonInput) {
Expand Down Expand Up @@ -523,15 +520,13 @@ private List<Object> compact(List<Object> list) {
stringAccumulator.setLength(0);
}
compacted.add(object);
}
else if (object instanceof FieldExtractor) {
} else if (object instanceof FieldExtractor) {
if (stringAccumulator.length() > 0) {
compacted.add(new BytesArray(stringAccumulator.toString()));
stringAccumulator.setLength(0);
}
compacted.add(new FieldWriter((FieldExtractor) object));
}
else {
} else {
stringAccumulator.append(object.toString());
}
}
Expand All @@ -546,3 +541,4 @@ protected FieldExtractor getParamExtractor() {
return paramsExtractor;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -42,48 +42,9 @@

public class DeleteBulkFactory extends AbstractBulkFactory {

public static final class NoDataWriter implements ValueWriter<Object> {

@Override
public Result write(Object writable, Generator generator) {
//delete doesn't require any content but it needs to extract metadata associated to a document
if (writable == null || writable instanceof NullWritable) {
generator.writeNull();
} else if (writable instanceof Text) {
Text text = (Text) writable;
generator.writeUTF8String(text.getBytes(), 0, text.getLength());
} else if (writable instanceof UTF8) {
UTF8 utf8 = (UTF8) writable;
generator.writeUTF8String(utf8.getBytes(), 0, utf8.getLength());
} else if (writable instanceof IntWritable) {
generator.writeNumber(((IntWritable) writable).get());
} else if (writable instanceof LongWritable) {
generator.writeNumber(((LongWritable) writable).get());
} else if (writable instanceof VLongWritable) {
generator.writeNumber(((VLongWritable) writable).get());
} else if (writable instanceof VIntWritable) {
generator.writeNumber(((VIntWritable) writable).get());
} else if (writable instanceof ByteWritable) {
generator.writeNumber(((ByteWritable) writable).get());
} else if (writable instanceof DoubleWritable) {
generator.writeNumber(((DoubleWritable) writable).get());
} else if (writable instanceof FloatWritable) {
generator.writeNumber(((FloatWritable) writable).get());
} else if (writable instanceof BooleanWritable) {
generator.writeBoolean(((BooleanWritable) writable).get());
} else if (writable instanceof BytesWritable) {
BytesWritable bw = (BytesWritable) writable;
generator.writeBinary(bw.getBytes(), 0, bw.getLength());
} else if (writable instanceof MD5Hash) {
generator.writeString(writable.toString());
}
return Result.SUCCESFUL();
}
}

public DeleteBulkFactory(Settings settings, MetadataExtractor metaExtractor, EsMajorVersion version) {
// we only want a specific serializer for this particular bulk factory
super(settings.copy().setSerializerValueWriterClassName(NoDataWriter.class.getName()), metaExtractor, version);
super(settings, metaExtractor, version);
}

@Override
Expand All @@ -97,3 +58,4 @@ protected void writeObjectEnd(List<Object> list) {
list.add(StringUtils.EMPTY);
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.hadoop.serialization.bulk;

import org.elasticsearch.hadoop.util.BytesRef;

import java.util.Collection;

public class DeleteTemplatedBulk extends TemplatedBulk {

DeleteTemplatedBulk(Collection<Object> beforeObject, Collection<Object> afterObject) {
super(beforeObject, afterObject, null);
}

@Override
public BytesRef write(Object object) {
ref.reset();
scratchPad.reset();
Object processed = preProcess(object, scratchPad);
writeTemplate(beforeObject, processed);
writeTemplate(afterObject, processed);
return ref;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@

class TemplatedBulk implements BulkCommand {

private final Collection<Object> beforeObject;
private final Collection<Object> afterObject;
protected final Collection<Object> beforeObject;
protected final Collection<Object> afterObject;

private BytesArray scratchPad = new BytesArray(1024);
private BytesRef ref = new BytesRef();
protected BytesArray scratchPad = new BytesArray(1024);
protected BytesRef ref = new BytesRef();

private final ValueWriter valueWriter;

Expand Down Expand Up @@ -71,7 +71,7 @@ protected void doWriteObject(Object object, BytesArray storage, ValueWriter<?> w
ContentBuilder.generate(bos, writer).value(object).flush().close();
}

private void writeTemplate(Collection<Object> template, Object object) {
protected void writeTemplate(Collection<Object> template, Object object) {
for (Object item : template) {
if (item instanceof BytesArray) {
ref.add((BytesArray) item);
Expand All @@ -89,4 +89,4 @@ else if (item instanceof DynamicContentRef) {
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,6 @@
*/
package org.elasticsearch.hadoop.serialization;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;

import org.elasticsearch.hadoop.EsHadoopIllegalArgumentException;
import org.elasticsearch.hadoop.cfg.ConfigurationOptions;
import org.elasticsearch.hadoop.cfg.Settings;
Expand All @@ -33,7 +26,6 @@
import org.elasticsearch.hadoop.serialization.builder.JdkValueWriter;
import org.elasticsearch.hadoop.serialization.bulk.BulkCommand;
import org.elasticsearch.hadoop.serialization.bulk.BulkCommands;
import org.elasticsearch.hadoop.serialization.bulk.DeleteBulkFactory;
import org.elasticsearch.hadoop.util.BytesArray;
import org.elasticsearch.hadoop.util.EsMajorVersion;
import org.elasticsearch.hadoop.util.StringUtils;
Expand All @@ -44,8 +36,12 @@
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;

import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;

import static org.junit.Assert.assertEquals;
import static org.junit.Assume.assumeFalse;
import static org.junit.Assume.assumeTrue;

Expand Down Expand Up @@ -84,7 +80,7 @@ public static Collection<Object[]> data() {
for (EsMajorVersion version : versions) {
for (boolean asJson : asJsons) {
for (String operation : operations) {
result.add(new Object[]{ operation, asJson, version });
result.add(new Object[]{operation, asJson, version});
}
}
}
Expand Down Expand Up @@ -317,7 +313,7 @@ public void testUpdateOnlyInlineScript5X() throws Exception {
create(set).write(data).copyTo(ba);
String result =
"{\"" + operation + "\":{\"_id\":2,\"_retry_on_conflict\":3}}\n" +
"{\"script\":{\"inline\":\"counter = 3\",\"lang\":\"groovy\"}}\n";
"{\"script\":{\"inline\":\"counter = 3\",\"lang\":\"groovy\"}}\n";
assertEquals(result, ba.toString());
}

Expand Down Expand Up @@ -447,7 +443,7 @@ public void testUpdateOnlyParamInlineScript5X() throws Exception {

String result =
"{\"" + operation + "\":{\"_id\":1}}\n" +
"{\"script\":{\"inline\":\"counter = param1; anothercounter = param2\",\"lang\":\"groovy\",\"params\":{\"param1\":1,\"param2\":1}}}\n";
"{\"script\":{\"inline\":\"counter = param1; anothercounter = param2\",\"lang\":\"groovy\",\"params\":{\"param1\":1,\"param2\":1}}}\n";

assertEquals(result, ba.toString());
}
Expand Down Expand Up @@ -523,11 +519,7 @@ private Settings settings() {

set.setInternalVersion(version);
set.setProperty(ConfigurationOptions.ES_INPUT_JSON, Boolean.toString(jsonInput));
if (isDeleteOP()) {
InitializationUtils.setValueWriterIfNotSet(set, DeleteBulkFactory.NoDataWriter.class, null);
} else {
InitializationUtils.setValueWriterIfNotSet(set, JdkValueWriter.class, null);
}
InitializationUtils.setValueWriterIfNotSet(set, JdkValueWriter.class, null);
InitializationUtils.setFieldExtractorIfNotSet(set, MapFieldExtractor.class, null);
InitializationUtils.setBytesConverterIfNeeded(set, JdkBytesConverter.class, null);
InitializationUtils.setUserProviderIfNotSet(set, HadoopUserProvider.class, null);
Expand Down Expand Up @@ -577,3 +569,4 @@ private boolean isDeleteOP() {
return ConfigurationOptions.ES_OPERATION_DELETE.equals(operation);
}
}

Loading

0 comments on commit db71a2d

Please sign in to comment.