From db71a2da4e45f011511dfc14eadc213786c1101a Mon Sep 17 00:00:00 2001 From: Vincent Arnaud Date: Fri, 11 Oct 2019 05:47:19 +0200 Subject: [PATCH] implement a new BulkCommand to support delete operation --- .../elasticsearch/hadoop/cfg/Settings.java | 5 +- .../bulk/AbstractBulkFactory.java | 64 ++++---- .../serialization/bulk/DeleteBulkFactory.java | 42 +----- .../bulk/DeleteTemplatedBulk.java | 41 +++++ .../serialization/bulk/TemplatedBulk.java | 12 +- .../hadoop/serialization/CommandTest.java | 27 ++-- .../NoDataWriterTypeToJsonTest.java | 140 ------------------ 7 files changed, 90 insertions(+), 241 deletions(-) create mode 100644 mr/src/main/java/org/elasticsearch/hadoop/serialization/bulk/DeleteTemplatedBulk.java delete mode 100644 mr/src/test/java/org/elasticsearch/hadoop/serialization/NoDataWriterTypeToJsonTest.java diff --git a/mr/src/main/java/org/elasticsearch/hadoop/cfg/Settings.java b/mr/src/main/java/org/elasticsearch/hadoop/cfg/Settings.java index 317db80ae..6ffe2e030 100644 --- a/mr/src/main/java/org/elasticsearch/hadoop/cfg/Settings.java +++ b/mr/src/main/java/org/elasticsearch/hadoop/cfg/Settings.java @@ -221,10 +221,6 @@ public String getSerializerValueWriterClassName() { return getProperty(ES_SERIALIZATION_WRITER_VALUE_CLASS); } - public Settings setSerializerValueWriterClassName(String className) { - setProperty(ES_SERIALIZATION_WRITER_VALUE_CLASS, className); - return this; - } public String getSerializerBytesConverterClassName() { return getProperty(ES_SERIALIZATION_WRITER_BYTES_CLASS); @@ -770,3 +766,4 @@ public String save() { public abstract Properties asProperties(); } + diff --git a/mr/src/main/java/org/elasticsearch/hadoop/serialization/bulk/AbstractBulkFactory.java b/mr/src/main/java/org/elasticsearch/hadoop/serialization/bulk/AbstractBulkFactory.java index bb5be9445..054e540d3 100644 --- a/mr/src/main/java/org/elasticsearch/hadoop/serialization/bulk/AbstractBulkFactory.java +++ b/mr/src/main/java/org/elasticsearch/hadoop/serialization/bulk/AbstractBulkFactory.java @@ -18,13 +18,10 @@ */ package org.elasticsearch.hadoop.serialization.bulk; -import java.util.ArrayList; -import java.util.List; -import java.util.Date; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.elasticsearch.hadoop.EsHadoopIllegalArgumentException; +import org.elasticsearch.hadoop.cfg.ConfigurationOptions; import org.elasticsearch.hadoop.cfg.Settings; import org.elasticsearch.hadoop.rest.Resource; import org.elasticsearch.hadoop.serialization.builder.ValueWriter; @@ -44,6 +41,10 @@ import org.elasticsearch.hadoop.util.ObjectUtils; import org.elasticsearch.hadoop.util.StringUtils; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + public abstract class AbstractBulkFactory implements BulkFactory { private static Log log = LogFactory.getLog(AbstractBulkFactory.class); @@ -60,13 +61,13 @@ public abstract class AbstractBulkFactory implements BulkFactory { // used when specifying an index pattern private IndexExtractor indexExtractor; private FieldExtractor idExtractor, - typeExtractor, - parentExtractor, - routingExtractor, - versionExtractor, - ttlExtractor, - timestampExtractor, - paramsExtractor; + typeExtractor, + parentExtractor, + routingExtractor, + versionExtractor, + ttlExtractor, + timestampExtractor, + paramsExtractor; private final FieldExtractor versionTypeExtractor = new FieldExtractor() { @@ -139,14 +140,10 @@ void doWrite(Object value) { } pool.get().bytes(valueString); - } - - else if (value instanceof Date) { - String valueString = (value == null ? "null": Long.toString(((Date) value).getTime())); + } else if (value instanceof Date) { + String valueString = (value == null ? "null" : Long.toString(((Date) value).getTime())); pool.get().bytes(valueString); - } - - else if (value instanceof RawJson) { + } else if (value instanceof RawJson) { pool.get().bytes(((RawJson) value).json()); } // library specific type - use the value writer (a bit overkill but handles collections/arrays properly) @@ -249,17 +246,16 @@ private void initExtractorsFromSettings(final Settings settings) { ttlExtractor = jsonExtractors.ttl(); timestampExtractor = jsonExtractors.timestamp(); paramsExtractor = jsonExtractors.params(); - } - else { + } else { // init extractors (if needed) if (settings.getMappingId() != null) { settings.setProperty(ConstantFieldExtractor.PROPERTY, settings.getMappingId()); - idExtractor = ObjectUtils. instantiate(settings.getMappingIdExtractorClassName(), + idExtractor = ObjectUtils.instantiate(settings.getMappingIdExtractorClassName(), settings); } if (settings.getMappingParent() != null) { settings.setProperty(ConstantFieldExtractor.PROPERTY, settings.getMappingParent()); - parentExtractor = ObjectUtils. instantiate( + parentExtractor = ObjectUtils.instantiate( settings.getMappingParentExtractorClassName(), settings); } // Two different properties can satisfy the routing field extraction @@ -267,7 +263,7 @@ private void initExtractorsFromSettings(final Settings settings) { List routings = new ArrayList(2); if (settings.getMappingRouting() != null) { settings.setProperty(ConstantFieldExtractor.PROPERTY, settings.getMappingRouting()); - FieldExtractor extractor = ObjectUtils. instantiate( + FieldExtractor extractor = ObjectUtils.instantiate( settings.getMappingRoutingExtractorClassName(), settings); // If we specify a routing field, return NOT_FOUND if we ultimately cannot find one instead of skipping routingResponse = ChainedFieldExtractor.NoValueHandler.NOT_FOUND; @@ -286,22 +282,22 @@ private void initExtractorsFromSettings(final Settings settings) { if (settings.getMappingTtl() != null) { settings.setProperty(ConstantFieldExtractor.PROPERTY, settings.getMappingTtl()); - ttlExtractor = ObjectUtils. instantiate(settings.getMappingTtlExtractorClassName(), + ttlExtractor = ObjectUtils.instantiate(settings.getMappingTtlExtractorClassName(), settings); } if (settings.getMappingVersion() != null) { settings.setProperty(ConstantFieldExtractor.PROPERTY, settings.getMappingVersion()); - versionExtractor = ObjectUtils. instantiate( + versionExtractor = ObjectUtils.instantiate( settings.getMappingVersionExtractorClassName(), settings); } if (settings.getMappingTimestamp() != null) { settings.setProperty(ConstantFieldExtractor.PROPERTY, settings.getMappingTimestamp()); - timestampExtractor = ObjectUtils. instantiate( + timestampExtractor = ObjectUtils.instantiate( settings.getMappingTimestampExtractorClassName(), settings); } // create adapter - IndexExtractor iformat = ObjectUtils. instantiate(settings.getMappingIndexExtractorClassName(), settings); + IndexExtractor iformat = ObjectUtils.instantiate(settings.getMappingIndexExtractorClassName(), settings); iformat.compile(new Resource(settings, false).toString()); if (iformat.hasPattern()) { @@ -371,14 +367,15 @@ public BulkCommand createBulk() { if (!isStatic) { before.add(new DynamicHeaderRef()); after.add(new DynamicEndRef()); - } - else { + } else { writeObjectHeader(before); before = compact(before); writeObjectEnd(after); after = compact(after); } - + if (ConfigurationOptions.ES_OPERATION_DELETE.equals(getOperation())) { + return new DeleteTemplatedBulk(before, after); + } boolean isScriptUpdate = settings.hasUpdateScript(); // compress pieces if (jsonInput) { @@ -523,15 +520,13 @@ private List compact(List list) { stringAccumulator.setLength(0); } compacted.add(object); - } - else if (object instanceof FieldExtractor) { + } else if (object instanceof FieldExtractor) { if (stringAccumulator.length() > 0) { compacted.add(new BytesArray(stringAccumulator.toString())); stringAccumulator.setLength(0); } compacted.add(new FieldWriter((FieldExtractor) object)); - } - else { + } else { stringAccumulator.append(object.toString()); } } @@ -546,3 +541,4 @@ protected FieldExtractor getParamExtractor() { return paramsExtractor; } } + diff --git a/mr/src/main/java/org/elasticsearch/hadoop/serialization/bulk/DeleteBulkFactory.java b/mr/src/main/java/org/elasticsearch/hadoop/serialization/bulk/DeleteBulkFactory.java index fc0d9a049..8ce784542 100644 --- a/mr/src/main/java/org/elasticsearch/hadoop/serialization/bulk/DeleteBulkFactory.java +++ b/mr/src/main/java/org/elasticsearch/hadoop/serialization/bulk/DeleteBulkFactory.java @@ -42,48 +42,9 @@ public class DeleteBulkFactory extends AbstractBulkFactory { - public static final class NoDataWriter implements ValueWriter { - - @Override - public Result write(Object writable, Generator generator) { - //delete doesn't require any content but it needs to extract metadata associated to a document - if (writable == null || writable instanceof NullWritable) { - generator.writeNull(); - } else if (writable instanceof Text) { - Text text = (Text) writable; - generator.writeUTF8String(text.getBytes(), 0, text.getLength()); - } else if (writable instanceof UTF8) { - UTF8 utf8 = (UTF8) writable; - generator.writeUTF8String(utf8.getBytes(), 0, utf8.getLength()); - } else if (writable instanceof IntWritable) { - generator.writeNumber(((IntWritable) writable).get()); - } else if (writable instanceof LongWritable) { - generator.writeNumber(((LongWritable) writable).get()); - } else if (writable instanceof VLongWritable) { - generator.writeNumber(((VLongWritable) writable).get()); - } else if (writable instanceof VIntWritable) { - generator.writeNumber(((VIntWritable) writable).get()); - } else if (writable instanceof ByteWritable) { - generator.writeNumber(((ByteWritable) writable).get()); - } else if (writable instanceof DoubleWritable) { - generator.writeNumber(((DoubleWritable) writable).get()); - } else if (writable instanceof FloatWritable) { - generator.writeNumber(((FloatWritable) writable).get()); - } else if (writable instanceof BooleanWritable) { - generator.writeBoolean(((BooleanWritable) writable).get()); - } else if (writable instanceof BytesWritable) { - BytesWritable bw = (BytesWritable) writable; - generator.writeBinary(bw.getBytes(), 0, bw.getLength()); - } else if (writable instanceof MD5Hash) { - generator.writeString(writable.toString()); - } - return Result.SUCCESFUL(); - } - } public DeleteBulkFactory(Settings settings, MetadataExtractor metaExtractor, EsMajorVersion version) { - // we only want a specific serializer for this particular bulk factory - super(settings.copy().setSerializerValueWriterClassName(NoDataWriter.class.getName()), metaExtractor, version); + super(settings, metaExtractor, version); } @Override @@ -97,3 +58,4 @@ protected void writeObjectEnd(List list) { list.add(StringUtils.EMPTY); } } + diff --git a/mr/src/main/java/org/elasticsearch/hadoop/serialization/bulk/DeleteTemplatedBulk.java b/mr/src/main/java/org/elasticsearch/hadoop/serialization/bulk/DeleteTemplatedBulk.java new file mode 100644 index 000000000..69ca78d26 --- /dev/null +++ b/mr/src/main/java/org/elasticsearch/hadoop/serialization/bulk/DeleteTemplatedBulk.java @@ -0,0 +1,41 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.hadoop.serialization.bulk; + +import org.elasticsearch.hadoop.util.BytesRef; + +import java.util.Collection; + +public class DeleteTemplatedBulk extends TemplatedBulk { + + DeleteTemplatedBulk(Collection beforeObject, Collection afterObject) { + super(beforeObject, afterObject, null); + } + + @Override + public BytesRef write(Object object) { + ref.reset(); + scratchPad.reset(); + Object processed = preProcess(object, scratchPad); + writeTemplate(beforeObject, processed); + writeTemplate(afterObject, processed); + return ref; + } +} + diff --git a/mr/src/main/java/org/elasticsearch/hadoop/serialization/bulk/TemplatedBulk.java b/mr/src/main/java/org/elasticsearch/hadoop/serialization/bulk/TemplatedBulk.java index 26ed9f606..26ec13e13 100644 --- a/mr/src/main/java/org/elasticsearch/hadoop/serialization/bulk/TemplatedBulk.java +++ b/mr/src/main/java/org/elasticsearch/hadoop/serialization/bulk/TemplatedBulk.java @@ -32,11 +32,11 @@ class TemplatedBulk implements BulkCommand { - private final Collection beforeObject; - private final Collection afterObject; + protected final Collection beforeObject; + protected final Collection afterObject; - private BytesArray scratchPad = new BytesArray(1024); - private BytesRef ref = new BytesRef(); + protected BytesArray scratchPad = new BytesArray(1024); + protected BytesRef ref = new BytesRef(); private final ValueWriter valueWriter; @@ -71,7 +71,7 @@ protected void doWriteObject(Object object, BytesArray storage, ValueWriter w ContentBuilder.generate(bos, writer).value(object).flush().close(); } - private void writeTemplate(Collection template, Object object) { + protected void writeTemplate(Collection template, Object object) { for (Object item : template) { if (item instanceof BytesArray) { ref.add((BytesArray) item); @@ -89,4 +89,4 @@ else if (item instanceof DynamicContentRef) { } } } -} \ No newline at end of file +} diff --git a/mr/src/test/java/org/elasticsearch/hadoop/serialization/CommandTest.java b/mr/src/test/java/org/elasticsearch/hadoop/serialization/CommandTest.java index abf5c18cf..82ce166e1 100644 --- a/mr/src/test/java/org/elasticsearch/hadoop/serialization/CommandTest.java +++ b/mr/src/test/java/org/elasticsearch/hadoop/serialization/CommandTest.java @@ -18,13 +18,6 @@ */ package org.elasticsearch.hadoop.serialization; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.LinkedHashMap; -import java.util.Map; - import org.elasticsearch.hadoop.EsHadoopIllegalArgumentException; import org.elasticsearch.hadoop.cfg.ConfigurationOptions; import org.elasticsearch.hadoop.cfg.Settings; @@ -33,7 +26,6 @@ import org.elasticsearch.hadoop.serialization.builder.JdkValueWriter; import org.elasticsearch.hadoop.serialization.bulk.BulkCommand; import org.elasticsearch.hadoop.serialization.bulk.BulkCommands; -import org.elasticsearch.hadoop.serialization.bulk.DeleteBulkFactory; import org.elasticsearch.hadoop.util.BytesArray; import org.elasticsearch.hadoop.util.EsMajorVersion; import org.elasticsearch.hadoop.util.StringUtils; @@ -44,8 +36,12 @@ import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; -import static org.junit.Assert.assertEquals; +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.Map; +import static org.junit.Assert.assertEquals; import static org.junit.Assume.assumeFalse; import static org.junit.Assume.assumeTrue; @@ -84,7 +80,7 @@ public static Collection data() { for (EsMajorVersion version : versions) { for (boolean asJson : asJsons) { for (String operation : operations) { - result.add(new Object[]{ operation, asJson, version }); + result.add(new Object[]{operation, asJson, version}); } } } @@ -317,7 +313,7 @@ public void testUpdateOnlyInlineScript5X() throws Exception { create(set).write(data).copyTo(ba); String result = "{\"" + operation + "\":{\"_id\":2,\"_retry_on_conflict\":3}}\n" + - "{\"script\":{\"inline\":\"counter = 3\",\"lang\":\"groovy\"}}\n"; + "{\"script\":{\"inline\":\"counter = 3\",\"lang\":\"groovy\"}}\n"; assertEquals(result, ba.toString()); } @@ -447,7 +443,7 @@ public void testUpdateOnlyParamInlineScript5X() throws Exception { String result = "{\"" + operation + "\":{\"_id\":1}}\n" + - "{\"script\":{\"inline\":\"counter = param1; anothercounter = param2\",\"lang\":\"groovy\",\"params\":{\"param1\":1,\"param2\":1}}}\n"; + "{\"script\":{\"inline\":\"counter = param1; anothercounter = param2\",\"lang\":\"groovy\",\"params\":{\"param1\":1,\"param2\":1}}}\n"; assertEquals(result, ba.toString()); } @@ -523,11 +519,7 @@ private Settings settings() { set.setInternalVersion(version); set.setProperty(ConfigurationOptions.ES_INPUT_JSON, Boolean.toString(jsonInput)); - if (isDeleteOP()) { - InitializationUtils.setValueWriterIfNotSet(set, DeleteBulkFactory.NoDataWriter.class, null); - } else { - InitializationUtils.setValueWriterIfNotSet(set, JdkValueWriter.class, null); - } + InitializationUtils.setValueWriterIfNotSet(set, JdkValueWriter.class, null); InitializationUtils.setFieldExtractorIfNotSet(set, MapFieldExtractor.class, null); InitializationUtils.setBytesConverterIfNeeded(set, JdkBytesConverter.class, null); InitializationUtils.setUserProviderIfNotSet(set, HadoopUserProvider.class, null); @@ -577,3 +569,4 @@ private boolean isDeleteOP() { return ConfigurationOptions.ES_OPERATION_DELETE.equals(operation); } } + diff --git a/mr/src/test/java/org/elasticsearch/hadoop/serialization/NoDataWriterTypeToJsonTest.java b/mr/src/test/java/org/elasticsearch/hadoop/serialization/NoDataWriterTypeToJsonTest.java deleted file mode 100644 index fd796f779..000000000 --- a/mr/src/test/java/org/elasticsearch/hadoop/serialization/NoDataWriterTypeToJsonTest.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.hadoop.serialization; - -import org.apache.hadoop.io.*; -import org.elasticsearch.hadoop.mr.LinkedMapWritable; -import org.elasticsearch.hadoop.serialization.builder.ContentBuilder; -import org.elasticsearch.hadoop.serialization.bulk.DeleteBulkFactory; -import org.elasticsearch.hadoop.util.BytesArray; -import org.elasticsearch.hadoop.util.FastByteArrayOutputStream; -import org.junit.*; - -@SuppressWarnings("deprecation") -public class NoDataWriterTypeToJsonTest { - private static FastByteArrayOutputStream out; - - @BeforeClass - public static void beforeClass() { - out = new FastByteArrayOutputStream(); - } - - @Before - public void start() { - out.reset(); - } - - @After - public void after() { - out.reset(); - } - - @AfterClass - public static void afterClass() { - out = null; - } - - @Test - public void testNull() { - writableTypeToJson(null, new BytesArray("null")); - } - - @Test - public void testNullWritable() throws Exception { - writableTypeToJson(NullWritable.get(), new BytesArray("null")); - } - - @Test - public void testString() { - writableTypeToJson(new Text("some text"), new BytesArray("\"some text\"".getBytes())); - } - - @Test - public void testUTF8() { - writableTypeToJson(new UTF8("some utf8"), new BytesArray("\"some utf8\"".getBytes())); - } - - @Test - public void testInteger() { - writableTypeToJson(new IntWritable(Integer.MAX_VALUE), new BytesArray(Integer.toString(Integer.MAX_VALUE))); - } - - @Test - public void testLong() { - writableTypeToJson(new LongWritable(Long.MAX_VALUE), new BytesArray(Long.toString(Long.MAX_VALUE))); - } - - @Test - public void testVInteger() { - writableTypeToJson(new VIntWritable(Integer.MAX_VALUE), new BytesArray(Integer.toString(Integer.MAX_VALUE))); - } - - @Test - public void testVLong() { - writableTypeToJson(new VLongWritable(Long.MAX_VALUE), new BytesArray(Long.toString(Long.MAX_VALUE))); - } - - @Test - public void testDouble() { - writableTypeToJson(new DoubleWritable(Double.MAX_VALUE), new BytesArray(Double.toString(Double.MAX_VALUE))); - } - - @Test - public void testFloat() { - writableTypeToJson(new FloatWritable(Float.MAX_VALUE), new BytesArray(Float.toString(Float.MAX_VALUE))); - } - - @Test - public void testBoolean() { - writableTypeToJson(new BooleanWritable(Boolean.TRUE), new BytesArray("true")); - } - - @Test - public void testMD5Hash() { - writableTypeToJson(MD5Hash.digest("md5hash"), new BytesArray("\"f9d08276bc85d30d578e8883f3c7e843\"".getBytes())); - } - - @Test - public void testByte() { - writableTypeToJson(new ByteWritable(Byte.MAX_VALUE), new BytesArray(Byte.toString(Byte.MAX_VALUE))); - } - - @Test - public void testByteArray() { - writableTypeToJson(new BytesWritable("byte array".getBytes()), new BytesArray("\"Ynl0ZSBhcnJheQ==\"")); - } - - @Test - public void testArray() { - writableTypeToJson(new ArrayWritable(new String[]{"one", "two"}), new BytesArray("")); - } - - @Test - public void testMap() { - LinkedMapWritable map = new LinkedMapWritable(); - map.put(new Text("key"), new IntWritable(1)); - map.put(new BooleanWritable(Boolean.TRUE), new ArrayWritable(new String[]{"one", "two"})); - writableTypeToJson(map, new BytesArray("")); - } - - private void writableTypeToJson(Writable obj, BytesArray expected) { - ContentBuilder.generate(out, new DeleteBulkFactory.NoDataWriter()).value(obj).flush().close(); - Assert.assertEquals(expected.toString(), out.bytes().toString()); - } -}