From 146de9f1dbd1f8d9cde53233eb0350092a6da8f7 Mon Sep 17 00:00:00 2001 From: vincentarnaud90 <37067128+vincentarnaud90@users.noreply.github.com> Date: Mon, 22 Jul 2019 13:05:27 +0200 Subject: [PATCH 1/4] Implement delete operation (#1) Implement delete operation and tests --- .../elasticsearch/hadoop/cfg/Settings.java | 5 + .../hadoop/rest/InitializationUtils.java | 19 ++- .../serialization/bulk/BulkCommands.java | 5 +- .../serialization/bulk/DeleteBulkFactory.java | 99 +++++++++++++ .../hadoop/rest/InitializationUtilsTest.java | 41 +++++- .../hadoop/serialization/CommandTest.java | 130 ++++++++++++------ .../NoDataWriterTypeToJsonTest.java | 122 ++++++++++++++++ 7 files changed, 369 insertions(+), 52 deletions(-) create mode 100644 mr/src/main/java/org/elasticsearch/hadoop/serialization/bulk/DeleteBulkFactory.java create mode 100644 mr/src/test/java/org/elasticsearch/hadoop/serialization/NoDataWriterTypeToJsonTest.java diff --git a/mr/src/main/java/org/elasticsearch/hadoop/cfg/Settings.java b/mr/src/main/java/org/elasticsearch/hadoop/cfg/Settings.java index 04cdbd076..317db80ae 100644 --- a/mr/src/main/java/org/elasticsearch/hadoop/cfg/Settings.java +++ b/mr/src/main/java/org/elasticsearch/hadoop/cfg/Settings.java @@ -221,6 +221,11 @@ public String getSerializerValueWriterClassName() { return getProperty(ES_SERIALIZATION_WRITER_VALUE_CLASS); } + public Settings setSerializerValueWriterClassName(String className) { + setProperty(ES_SERIALIZATION_WRITER_VALUE_CLASS, className); + return this; + } + public String getSerializerBytesConverterClassName() { return getProperty(ES_SERIALIZATION_WRITER_BYTES_CLASS); } diff --git a/mr/src/main/java/org/elasticsearch/hadoop/rest/InitializationUtils.java b/mr/src/main/java/org/elasticsearch/hadoop/rest/InitializationUtils.java index a7aa0f4d8..f08a21d1a 100644 --- a/mr/src/main/java/org/elasticsearch/hadoop/rest/InitializationUtils.java +++ b/mr/src/main/java/org/elasticsearch/hadoop/rest/InitializationUtils.java @@ -152,7 +152,7 @@ public static void filterNonDataNodesIfNeeded(Settings settings, Log log) { } RestClient bootstrap = new RestClient(settings); - try { + try { String message = "No data nodes with HTTP-enabled available"; List dataNodes = bootstrap.getHttpDataNodes(); if (dataNodes.isEmpty()) { @@ -253,10 +253,18 @@ public static void validateSettings(Settings settings) { Assert.isTrue(settings.getMappingExcludes().isEmpty(), "When writing data as JSON, the field exclusion feature is ignored. This is most likely not what the user intended. Bailing out..."); } + //check the configuration is coherent in order to use the delete operation + if (ConfigurationOptions.ES_OPERATION_DELETE.equals(settings.getOperation())) { + Assert.isTrue(!settings.getInputAsJson(), "When using delete operation, providing data as JSON is not coherent because this operation does not need document as a payload. This is most likely not what the user intended. Bailing out..."); + Assert.isTrue(settings.getMappingIncludes().isEmpty(), "When using delete operation, the field inclusion feature is ignored. This is most likely not what the user intended. Bailing out..."); + Assert.isTrue(settings.getMappingExcludes().isEmpty(), "When using delete operation, the field exclusion feature is ignored. This is most likely not what the user intended. Bailing out..."); + Assert.isTrue(settings.getMappingId() != null && !StringUtils.EMPTY.equals(settings.getMappingId()), "When using delete operation, the property " + ConfigurationOptions.ES_MAPPING_ID + " must be set and must not be empty since we need the document id in order to delete it. Bailing out..."); + } + // Check to make sure user doesn't specify more than one script type boolean hasScript = false; String[] scripts = {settings.getUpdateScriptInline(), settings.getUpdateScriptFile(), settings.getUpdateScriptStored()}; - for (String script: scripts) { + for (String script : scripts) { boolean isSet = StringUtils.hasText(script); Assert.isTrue((hasScript && isSet) == false, "Multiple scripts are specified. Please specify only one via [es.update.script.inline], [es.update.script.file], or [es.update.script.stored]"); hasScript = hasScript || isSet; @@ -291,7 +299,8 @@ public static void validateSettingsForWriting(Settings settings) { throw new EsHadoopIllegalArgumentException("Cannot use TTL on index/update requests in ES 6.x and " + "above. Please remove the [" + ConfigurationOptions.ES_MAPPING_TTL + "] setting."); } - } else { + } + else { if (StringUtils.hasText(settings.getMappingTtl())) { LOG.warn("Setting [" + ConfigurationOptions.ES_MAPPING_TTL + "] is deprecated! Support for [ttl] on " + "indexing and update requests has been removed in ES 6.x and above!"); @@ -378,7 +387,7 @@ private static void doCheckIndexExistence(Settings settings, RestRepository clie settings.getResourceWrite(), ConfigurationOptions.ES_INDEX_AUTO_CREATE, settings.getIndexAutoCreate())); } } - + public static boolean setMetadataExtractorIfNotSet(Settings settings, Class clazz, Log log) { if (!StringUtils.hasText(settings.getMappingMetadataExtractorClassName())) { Log logger = (log != null ? log : LogFactory.getLog(clazz)); @@ -468,4 +477,4 @@ public static boolean setUserProviderIfNotSet(Settings settings, Class { + + @Override + public Result write(Object writable, Generator generator) { + ///delete doesn't require any content but it needs to extract metadata associated to a document + if (writable == null || writable instanceof NullWritable) { + generator.writeNull(); + } + else if (writable instanceof Text) { + Text text = (Text) writable; + generator.writeUTF8String(text.getBytes(), 0, text.getLength()); + } + else if (writable instanceof UTF8) { + UTF8 utf8 = (UTF8) writable; + generator.writeUTF8String(utf8.getBytes(), 0, utf8.getLength()); + } + else if (writable instanceof IntWritable) { + generator.writeNumber(((IntWritable) writable).get()); + } + else if (writable instanceof LongWritable) { + generator.writeNumber(((LongWritable) writable).get()); + } + else if (writable instanceof VLongWritable) { + generator.writeNumber(((VLongWritable) writable).get()); + } + else if (writable instanceof VIntWritable) { + generator.writeNumber(((VIntWritable) writable).get()); + } + else if (writable instanceof ByteWritable) { + generator.writeNumber(((ByteWritable) writable).get()); + } + else if (writable instanceof DoubleWritable) { + generator.writeNumber(((DoubleWritable) writable).get()); + } + else if (writable instanceof FloatWritable) { + generator.writeNumber(((FloatWritable) writable).get()); + } + else if (writable instanceof BooleanWritable) { + generator.writeBoolean(((BooleanWritable) writable).get()); + } + else if (writable instanceof BytesWritable) { + BytesWritable bw = (BytesWritable) writable; + generator.writeBinary(bw.getBytes(), 0, bw.getLength()); + } + else if (writable instanceof MD5Hash) { + generator.writeString(writable.toString()); + } + return Result.SUCCESFUL(); + } + } + + public DeleteBulkFactory(Settings settings, MetadataExtractor metaExtractor, EsMajorVersion version) { + // we only want a specific serializer for this particular bulk factory + super(settings.copy().setSerializerValueWriterClassName(NoDataWriter.class.getName()), metaExtractor, version); + } + + @Override + protected String getOperation() { + return ConfigurationOptions.ES_OPERATION_DELETE; + } + + @Override + protected void writeObjectEnd(List list) { + // skip adding new-line for each entity as delete doesn't need entity output + list.add(StringUtils.EMPTY); + } +} diff --git a/mr/src/test/java/org/elasticsearch/hadoop/rest/InitializationUtilsTest.java b/mr/src/test/java/org/elasticsearch/hadoop/rest/InitializationUtilsTest.java index cf5866847..21d0f9e0f 100644 --- a/mr/src/test/java/org/elasticsearch/hadoop/rest/InitializationUtilsTest.java +++ b/mr/src/test/java/org/elasticsearch/hadoop/rest/InitializationUtilsTest.java @@ -136,4 +136,43 @@ public void testValidateWriteV6PlusTimestampRemoved() throws Exception { set.setProperty(ES_MAPPING_TIMESTAMP, "1000"); validateSettingsForWriting(set); } -} \ No newline at end of file + + @Test(expected = EsHadoopIllegalArgumentException.class) + public void testValidateDeleteOperationVsInputAsJson() { + Settings set = new TestSettings(); + set.setProperty(ES_WRITE_OPERATION, "delete"); + set.setProperty(ES_INPUT_JSON, "true"); + validateSettings(set); + } + + @Test(expected = EsHadoopIllegalArgumentException.class) + public void testValidateDeleteOperationVsIncludeFields() { + Settings set = new TestSettings(); + set.setProperty(ES_WRITE_OPERATION, "delete"); + set.setProperty(ES_MAPPING_INCLUDE, "field"); + validateSettings(set); + } + + @Test(expected = EsHadoopIllegalArgumentException.class) + public void testValidateDeleteOperationVsExcludeFields() { + Settings set = new TestSettings(); + set.setProperty(ES_WRITE_OPERATION, "delete"); + set.setProperty(ES_MAPPING_EXCLUDE, "field"); + validateSettings(set); + } + + @Test(expected = EsHadoopIllegalArgumentException.class) + public void testValidateDeleteOperationVsIdNotSet() { + Settings set = new TestSettings(); + set.setProperty(ES_WRITE_OPERATION, "delete"); + validateSettings(set); + } + + @Test(expected = EsHadoopIllegalArgumentException.class) + public void testValidateDeleteOperationVsEmptyId() { + Settings set = new TestSettings(); + set.setProperty(ES_WRITE_OPERATION, "delete"); + set.setProperty(ES_MAPPING_ID, ""); + validateSettings(set); + } +} diff --git a/mr/src/test/java/org/elasticsearch/hadoop/serialization/CommandTest.java b/mr/src/test/java/org/elasticsearch/hadoop/serialization/CommandTest.java index e476a5496..311eb0251 100644 --- a/mr/src/test/java/org/elasticsearch/hadoop/serialization/CommandTest.java +++ b/mr/src/test/java/org/elasticsearch/hadoop/serialization/CommandTest.java @@ -31,6 +31,7 @@ import org.elasticsearch.hadoop.serialization.builder.JdkValueWriter; import org.elasticsearch.hadoop.serialization.bulk.BulkCommand; import org.elasticsearch.hadoop.serialization.bulk.BulkCommands; +import org.elasticsearch.hadoop.serialization.bulk.DeleteBulkFactory; import org.elasticsearch.hadoop.util.BytesArray; import org.elasticsearch.hadoop.util.EsMajorVersion; import org.elasticsearch.hadoop.util.StringUtils; @@ -64,43 +65,55 @@ public static Collection data() { throw new IllegalStateException("CommandTest needs new version updates."); } - return Arrays.asList(new Object[][] { - { ConfigurationOptions.ES_OPERATION_INDEX, false, EsMajorVersion.V_1_X }, - { ConfigurationOptions.ES_OPERATION_CREATE, false, EsMajorVersion.V_1_X }, - { ConfigurationOptions.ES_OPERATION_UPDATE, false, EsMajorVersion.V_1_X }, - { ConfigurationOptions.ES_OPERATION_INDEX, true, EsMajorVersion.V_1_X }, - { ConfigurationOptions.ES_OPERATION_CREATE, true, EsMajorVersion.V_1_X }, - { ConfigurationOptions.ES_OPERATION_UPDATE, true, EsMajorVersion.V_1_X }, - { ConfigurationOptions.ES_OPERATION_INDEX, false, EsMajorVersion.V_2_X }, - { ConfigurationOptions.ES_OPERATION_CREATE, false, EsMajorVersion.V_2_X }, - { ConfigurationOptions.ES_OPERATION_UPDATE, false, EsMajorVersion.V_2_X }, - { ConfigurationOptions.ES_OPERATION_INDEX, true, EsMajorVersion.V_2_X }, - { ConfigurationOptions.ES_OPERATION_CREATE, true, EsMajorVersion.V_2_X }, - { ConfigurationOptions.ES_OPERATION_UPDATE, true, EsMajorVersion.V_2_X }, - { ConfigurationOptions.ES_OPERATION_INDEX, false, EsMajorVersion.V_5_X }, - { ConfigurationOptions.ES_OPERATION_CREATE, false, EsMajorVersion.V_5_X }, - { ConfigurationOptions.ES_OPERATION_UPDATE, false, EsMajorVersion.V_5_X }, - { ConfigurationOptions.ES_OPERATION_INDEX, true, EsMajorVersion.V_5_X }, - { ConfigurationOptions.ES_OPERATION_CREATE, true, EsMajorVersion.V_5_X }, - { ConfigurationOptions.ES_OPERATION_UPDATE, true, EsMajorVersion.V_5_X }, - { ConfigurationOptions.ES_OPERATION_INDEX, false, EsMajorVersion.V_6_X }, - { ConfigurationOptions.ES_OPERATION_CREATE, false, EsMajorVersion.V_6_X }, - { ConfigurationOptions.ES_OPERATION_UPDATE, false, EsMajorVersion.V_6_X }, - { ConfigurationOptions.ES_OPERATION_INDEX, true, EsMajorVersion.V_6_X }, - { ConfigurationOptions.ES_OPERATION_CREATE, true, EsMajorVersion.V_6_X }, - { ConfigurationOptions.ES_OPERATION_UPDATE, true, EsMajorVersion.V_6_X }, - { ConfigurationOptions.ES_OPERATION_INDEX, false, EsMajorVersion.V_7_X }, - { ConfigurationOptions.ES_OPERATION_CREATE, false, EsMajorVersion.V_7_X }, - { ConfigurationOptions.ES_OPERATION_UPDATE, false, EsMajorVersion.V_7_X }, - { ConfigurationOptions.ES_OPERATION_INDEX, true, EsMajorVersion.V_7_X }, - { ConfigurationOptions.ES_OPERATION_CREATE, true, EsMajorVersion.V_7_X }, - { ConfigurationOptions.ES_OPERATION_UPDATE, true, EsMajorVersion.V_7_X }, - { ConfigurationOptions.ES_OPERATION_INDEX, false, EsMajorVersion.V_8_X }, - { ConfigurationOptions.ES_OPERATION_CREATE, false, EsMajorVersion.V_8_X }, - { ConfigurationOptions.ES_OPERATION_UPDATE, false, EsMajorVersion.V_8_X }, - { ConfigurationOptions.ES_OPERATION_INDEX, true, EsMajorVersion.V_8_X }, - { ConfigurationOptions.ES_OPERATION_CREATE, true, EsMajorVersion.V_8_X }, - { ConfigurationOptions.ES_OPERATION_UPDATE, true, EsMajorVersion.V_8_X } + return Arrays.asList(new Object[][]{ + {ConfigurationOptions.ES_OPERATION_INDEX, false, EsMajorVersion.V_1_X}, + {ConfigurationOptions.ES_OPERATION_CREATE, false, EsMajorVersion.V_1_X}, + {ConfigurationOptions.ES_OPERATION_UPDATE, false, EsMajorVersion.V_1_X}, + {ConfigurationOptions.ES_OPERATION_DELETE, false, EsMajorVersion.V_1_X}, + {ConfigurationOptions.ES_OPERATION_INDEX, true, EsMajorVersion.V_1_X}, + {ConfigurationOptions.ES_OPERATION_CREATE, true, EsMajorVersion.V_1_X}, + {ConfigurationOptions.ES_OPERATION_UPDATE, true, EsMajorVersion.V_1_X}, + {ConfigurationOptions.ES_OPERATION_DELETE, true, EsMajorVersion.V_1_X}, + {ConfigurationOptions.ES_OPERATION_INDEX, false, EsMajorVersion.V_2_X}, + {ConfigurationOptions.ES_OPERATION_CREATE, false, EsMajorVersion.V_2_X}, + {ConfigurationOptions.ES_OPERATION_UPDATE, false, EsMajorVersion.V_2_X}, + {ConfigurationOptions.ES_OPERATION_DELETE, false, EsMajorVersion.V_2_X}, + {ConfigurationOptions.ES_OPERATION_INDEX, true, EsMajorVersion.V_2_X}, + {ConfigurationOptions.ES_OPERATION_CREATE, true, EsMajorVersion.V_2_X}, + {ConfigurationOptions.ES_OPERATION_UPDATE, true, EsMajorVersion.V_2_X}, + {ConfigurationOptions.ES_OPERATION_DELETE, true, EsMajorVersion.V_2_X}, + {ConfigurationOptions.ES_OPERATION_INDEX, false, EsMajorVersion.V_5_X}, + {ConfigurationOptions.ES_OPERATION_CREATE, false, EsMajorVersion.V_5_X}, + {ConfigurationOptions.ES_OPERATION_UPDATE, false, EsMajorVersion.V_5_X}, + {ConfigurationOptions.ES_OPERATION_DELETE, false, EsMajorVersion.V_5_X}, + {ConfigurationOptions.ES_OPERATION_INDEX, true, EsMajorVersion.V_5_X}, + {ConfigurationOptions.ES_OPERATION_CREATE, true, EsMajorVersion.V_5_X}, + {ConfigurationOptions.ES_OPERATION_UPDATE, true, EsMajorVersion.V_5_X}, + {ConfigurationOptions.ES_OPERATION_DELETE, true, EsMajorVersion.V_5_X}, + {ConfigurationOptions.ES_OPERATION_INDEX, false, EsMajorVersion.V_6_X}, + {ConfigurationOptions.ES_OPERATION_CREATE, false, EsMajorVersion.V_6_X}, + {ConfigurationOptions.ES_OPERATION_UPDATE, false, EsMajorVersion.V_6_X}, + {ConfigurationOptions.ES_OPERATION_DELETE, false, EsMajorVersion.V_6_X}, + {ConfigurationOptions.ES_OPERATION_INDEX, true, EsMajorVersion.V_6_X}, + {ConfigurationOptions.ES_OPERATION_CREATE, true, EsMajorVersion.V_6_X}, + {ConfigurationOptions.ES_OPERATION_UPDATE, true, EsMajorVersion.V_6_X}, + {ConfigurationOptions.ES_OPERATION_DELETE, true, EsMajorVersion.V_6_X}, + {ConfigurationOptions.ES_OPERATION_INDEX, false, EsMajorVersion.V_7_X}, + {ConfigurationOptions.ES_OPERATION_CREATE, false, EsMajorVersion.V_7_X}, + {ConfigurationOptions.ES_OPERATION_UPDATE, false, EsMajorVersion.V_7_X}, + {ConfigurationOptions.ES_OPERATION_DELETE, false, EsMajorVersion.V_7_X}, + {ConfigurationOptions.ES_OPERATION_INDEX, true, EsMajorVersion.V_7_X}, + {ConfigurationOptions.ES_OPERATION_CREATE, true, EsMajorVersion.V_7_X}, + {ConfigurationOptions.ES_OPERATION_UPDATE, true, EsMajorVersion.V_7_X}, + {ConfigurationOptions.ES_OPERATION_DELETE, true, EsMajorVersion.V_7_X}, + {ConfigurationOptions.ES_OPERATION_INDEX, false, EsMajorVersion.V_8_X}, + {ConfigurationOptions.ES_OPERATION_CREATE, false, EsMajorVersion.V_8_X}, + {ConfigurationOptions.ES_OPERATION_UPDATE, false, EsMajorVersion.V_8_X}, + {ConfigurationOptions.ES_OPERATION_DELETE, false, EsMajorVersion.V_8_X}, + {ConfigurationOptions.ES_OPERATION_INDEX, true, EsMajorVersion.V_8_X}, + {ConfigurationOptions.ES_OPERATION_CREATE, true, EsMajorVersion.V_8_X}, + {ConfigurationOptions.ES_OPERATION_UPDATE, true, EsMajorVersion.V_8_X}, + {ConfigurationOptions.ES_OPERATION_DELETE, true, EsMajorVersion.V_8_X} }); } @@ -127,6 +140,7 @@ public void prepare() { @Test public void testNoHeader() throws Exception { assumeFalse(ConfigurationOptions.ES_OPERATION_UPDATE.equals(operation)); + assumeFalse(ConfigurationOptions.ES_OPERATION_DELETE.equals(operation)); create(settings()).write(data).copyTo(ba); String result = prefix() + "}}" + map(); assertEquals(result, ba.toString()); @@ -135,6 +149,7 @@ public void testNoHeader() throws Exception { @Test // check user friendliness and escape the string if needed public void testConstantId() throws Exception { + assumeFalse(isDeleteOP() && jsonInput); Settings settings = settings(); noId = true; settings.setProperty(ConfigurationOptions.ES_MAPPING_ID, ""); @@ -148,6 +163,7 @@ public void testConstantId() throws Exception { @Test public void testParent() throws Exception { assumeTrue(version.onOrBefore(EsMajorVersion.V_6_X)); + assumeFalse(isDeleteOP() && jsonInput); Settings settings = settings(); settings.setProperty(ConfigurationOptions.ES_MAPPING_PARENT, "<5>"); @@ -159,6 +175,7 @@ public void testParent() throws Exception { @Test public void testParent7X() throws Exception { assumeTrue(version.onOrAfter(EsMajorVersion.V_7_X)); + assumeFalse(isDeleteOP() && jsonInput); Settings settings = settings(); settings.setProperty(ConfigurationOptions.ES_MAPPING_PARENT, "<5>"); @@ -170,6 +187,7 @@ public void testParent7X() throws Exception { @Test public void testVersion() throws Exception { assumeTrue(version.onOrBefore(EsMajorVersion.V_6_X)); + assumeFalse(isDeleteOP() && jsonInput); Settings settings = settings(); settings.setProperty(ConfigurationOptions.ES_MAPPING_VERSION, "<3>"); @@ -181,6 +199,7 @@ public void testVersion() throws Exception { @Test public void testVersion7X() throws Exception { assumeTrue(version.onOrAfter(EsMajorVersion.V_7_X)); + assumeFalse(isDeleteOP() && jsonInput); Settings settings = settings(); settings.setProperty(ConfigurationOptions.ES_MAPPING_VERSION, "<3>"); @@ -191,6 +210,7 @@ public void testVersion7X() throws Exception { @Test public void testTtl() throws Exception { + assumeFalse(isDeleteOP() && jsonInput); Settings settings = settings(); settings.setProperty(ConfigurationOptions.ES_MAPPING_TTL, "<2>"); @@ -201,6 +221,7 @@ public void testTtl() throws Exception { @Test public void testTimestamp() throws Exception { + assumeFalse(isDeleteOP() && jsonInput); Settings settings = settings(); settings.setProperty(ConfigurationOptions.ES_MAPPING_TIMESTAMP, "<3>"); create(settings).write(data).copyTo(ba); @@ -211,6 +232,7 @@ public void testTimestamp() throws Exception { @Test public void testRouting() throws Exception { assumeTrue(version.onOrBefore(EsMajorVersion.V_6_X)); + assumeFalse(isDeleteOP() && jsonInput); Settings settings = settings(); settings.setProperty(ConfigurationOptions.ES_MAPPING_ROUTING, "<4>"); @@ -222,6 +244,7 @@ public void testRouting() throws Exception { @Test public void testRouting7X() throws Exception { assumeTrue(version.onOrAfter(EsMajorVersion.V_7_X)); + assumeFalse(isDeleteOP() && jsonInput); Settings settings = settings(); settings.setProperty(ConfigurationOptions.ES_MAPPING_ROUTING, "<4>"); @@ -233,6 +256,7 @@ public void testRouting7X() throws Exception { @Test public void testAll() throws Exception { assumeTrue(version.onOrBefore(EsMajorVersion.V_6_X)); + assumeFalse(isDeleteOP() && jsonInput); Settings settings = settings(); settings.setProperty(ConfigurationOptions.ES_MAPPING_ID, "n"); settings.setProperty(ConfigurationOptions.ES_MAPPING_TTL, "<2>"); @@ -246,6 +270,7 @@ public void testAll() throws Exception { @Test public void testAll7X() throws Exception { assumeTrue(version.onOrAfter(EsMajorVersion.V_7_X)); + assumeFalse(isDeleteOP() && jsonInput); Settings settings = settings(); settings.setProperty(ConfigurationOptions.ES_MAPPING_ID, "n"); settings.setProperty(ConfigurationOptions.ES_MAPPING_TTL, "<2>"); @@ -258,10 +283,12 @@ public void testAll7X() throws Exception { @Test public void testIdPattern() throws Exception { + assumeFalse(isDeleteOP() && jsonInput); Settings settings = settings(); if (version.onOrAfter(EsMajorVersion.V_8_X)) { settings.setResourceWrite("{n}"); - } else { + } + else { settings.setResourceWrite("foo/{n}"); } @@ -269,7 +296,8 @@ public void testIdPattern() throws Exception { String header; if (version.onOrAfter(EsMajorVersion.V_8_X)) { header = "{\"_index\":\"1\"" + (isUpdateOp() ? ",\"_id\":2" : "") + "}"; - } else { + } + else { header = "{\"_index\":\"foo\",\"_type\":\"1\"" + (isUpdateOp() ? ",\"_id\":2" : "") + "}"; } String result = "{\"" + operation + "\":" + header + "}" + map(); @@ -317,7 +345,7 @@ public void testUpdateOnlyInlineScript5X() throws Exception { create(set).write(data).copyTo(ba); String result = "{\"" + operation + "\":{\"_id\":2,\"_retry_on_conflict\":3}}\n" + - "{\"script\":{\"inline\":\"counter = 3\",\"lang\":\"groovy\"}}\n"; + "{\"script\":{\"inline\":\"counter = 3\",\"lang\":\"groovy\"}}\n"; assertEquals(result, ba.toString()); } @@ -447,7 +475,7 @@ public void testUpdateOnlyParamInlineScript5X() throws Exception { String result = "{\"" + operation + "\":{\"_id\":1}}\n" + - "{\"script\":{\"inline\":\"counter = param1; anothercounter = param2\",\"lang\":\"groovy\",\"params\":{\"param1\":1,\"param2\":1}}}\n"; + "{\"script\":{\"inline\":\"counter = param1; anothercounter = param2\",\"lang\":\"groovy\",\"params\":{\"param1\":1,\"param2\":1}}}\n"; assertEquals(result, ba.toString()); } @@ -523,8 +551,12 @@ private Settings settings() { set.setInternalVersion(version); set.setProperty(ConfigurationOptions.ES_INPUT_JSON, Boolean.toString(jsonInput)); - - InitializationUtils.setValueWriterIfNotSet(set, JdkValueWriter.class, null); + if (isDeleteOP()) { + InitializationUtils.setValueWriterIfNotSet(set, DeleteBulkFactory.NoDataWriter.class, null); + } + else { + InitializationUtils.setValueWriterIfNotSet(set, JdkValueWriter.class, null); + } InitializationUtils.setFieldExtractorIfNotSet(set, MapFieldExtractor.class, null); InitializationUtils.setBytesConverterIfNeeded(set, JdkBytesConverter.class, null); InitializationUtils.setUserProviderIfNotSet(set, HadoopUserProvider.class, null); @@ -532,7 +564,8 @@ private Settings settings() { set.setProperty(ConfigurationOptions.ES_WRITE_OPERATION, operation); if (version.onOrAfter(EsMajorVersion.V_8_X)) { set.setResourceWrite("foo"); - } else { + } + else { set.setResourceWrite("foo/bar"); } if (isUpdateOp()) { @@ -550,6 +583,9 @@ private String prefix() { } private String map() { + if (isDeleteOP()) { + return "\n"; + } StringBuilder sb = new StringBuilder("\n{"); if (isUpdateOp()) { sb.append("\"doc\":{"); @@ -566,4 +602,8 @@ private String map() { private boolean isUpdateOp() { return ConfigurationOptions.ES_OPERATION_UPDATE.equals(operation); } -} \ No newline at end of file + + private boolean isDeleteOP() { + return ConfigurationOptions.ES_OPERATION_DELETE.equals(operation); + } +} diff --git a/mr/src/test/java/org/elasticsearch/hadoop/serialization/NoDataWriterTypeToJsonTest.java b/mr/src/test/java/org/elasticsearch/hadoop/serialization/NoDataWriterTypeToJsonTest.java new file mode 100644 index 000000000..846d09248 --- /dev/null +++ b/mr/src/test/java/org/elasticsearch/hadoop/serialization/NoDataWriterTypeToJsonTest.java @@ -0,0 +1,122 @@ +package org.elasticsearch.hadoop.serialization; + +import org.apache.hadoop.io.*; +import org.elasticsearch.hadoop.mr.LinkedMapWritable; +import org.elasticsearch.hadoop.serialization.builder.ContentBuilder; +import org.elasticsearch.hadoop.serialization.bulk.DeleteBulkFactory; +import org.elasticsearch.hadoop.util.BytesArray; +import org.elasticsearch.hadoop.util.FastByteArrayOutputStream; +import org.junit.*; + +@SuppressWarnings("deprecation") +public class NoDataWriterTypeToJsonTest { + private static FastByteArrayOutputStream out; + + @BeforeClass + public static void beforeClass() { + out = new FastByteArrayOutputStream(); + } + + @Before + public void start() { + out.reset(); + } + + @After + public void after() { + out.reset(); + } + + @AfterClass + public static void afterClass() { + out = null; + } + + @Test + public void testNull() { + writableTypeToJson(null, new BytesArray("null")); + } + + @Test + public void testNullWritable() throws Exception { + writableTypeToJson(NullWritable.get(), new BytesArray("null")); + } + + @Test + public void testString() { + writableTypeToJson(new Text("some text"), new BytesArray("\"some text\"".getBytes())); + } + + @Test + public void testUTF8() { + writableTypeToJson(new UTF8("some utf8"), new BytesArray("\"some utf8\"".getBytes())); + } + + @Test + public void testInteger() { + writableTypeToJson(new IntWritable(Integer.MAX_VALUE), new BytesArray(Integer.toString(Integer.MAX_VALUE))); + } + + @Test + public void testLong() { + writableTypeToJson(new LongWritable(Long.MAX_VALUE), new BytesArray(Long.toString(Long.MAX_VALUE))); + } + + @Test + public void testVInteger() { + writableTypeToJson(new VIntWritable(Integer.MAX_VALUE), new BytesArray(Integer.toString(Integer.MAX_VALUE))); + } + + @Test + public void testVLong() { + writableTypeToJson(new VLongWritable(Long.MAX_VALUE), new BytesArray(Long.toString(Long.MAX_VALUE))); + } + + @Test + public void testDouble() { + writableTypeToJson(new DoubleWritable(Double.MAX_VALUE), new BytesArray(Double.toString(Double.MAX_VALUE))); + } + + @Test + public void testFloat() { + writableTypeToJson(new FloatWritable(Float.MAX_VALUE), new BytesArray(Float.toString(Float.MAX_VALUE))); + } + + @Test + public void testBoolean() { + writableTypeToJson(new BooleanWritable(Boolean.TRUE), new BytesArray("true")); + } + + @Test + public void testMD5Hash() { + writableTypeToJson(MD5Hash.digest("md5hash"), new BytesArray("\"f9d08276bc85d30d578e8883f3c7e843\"".getBytes())); + } + + @Test + public void testByte() { + writableTypeToJson(new ByteWritable(Byte.MAX_VALUE), new BytesArray(Byte.toString(Byte.MAX_VALUE))); + } + + @Test + public void testByteArray() { + writableTypeToJson(new BytesWritable("byte array".getBytes()), new BytesArray("\"Ynl0ZSBhcnJheQ==\"")); + } + + @Test + public void testArray() { + writableTypeToJson(new ArrayWritable(new String[]{"one", "two"}), new BytesArray("")); + } + + @Test + public void testMap() { + LinkedMapWritable map = new LinkedMapWritable(); + map.put(new Text("key"), new IntWritable(1)); + map.put(new BooleanWritable(Boolean.TRUE), new ArrayWritable(new String[]{"one", "two"})); + writableTypeToJson(map, new BytesArray("")); + } + + private void writableTypeToJson(Writable obj, BytesArray expected) { + ContentBuilder.generate(out, new DeleteBulkFactory.NoDataWriter()).value(obj).flush().close(); + Assert.assertEquals(expected.toString(), out.bytes().toString()); + } +} From b6e8f7e72b1148ab1b783c36e42c0c0004102432 Mon Sep 17 00:00:00 2001 From: vincentarnaud90 <37067128+vincentarnaud90@users.noreply.github.com> Date: Mon, 22 Jul 2019 14:14:25 +0200 Subject: [PATCH 2/4] Update NoDataWriterTypeToJsonTest.java Add license headers --- .../NoDataWriterTypeToJsonTest.java | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/mr/src/test/java/org/elasticsearch/hadoop/serialization/NoDataWriterTypeToJsonTest.java b/mr/src/test/java/org/elasticsearch/hadoop/serialization/NoDataWriterTypeToJsonTest.java index 846d09248..fd796f779 100644 --- a/mr/src/test/java/org/elasticsearch/hadoop/serialization/NoDataWriterTypeToJsonTest.java +++ b/mr/src/test/java/org/elasticsearch/hadoop/serialization/NoDataWriterTypeToJsonTest.java @@ -1,3 +1,21 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.elasticsearch.hadoop.serialization; import org.apache.hadoop.io.*; From ab94d45f95dde934456141c369cd20654fc10daf Mon Sep 17 00:00:00 2001 From: vincentarnaud90 <37067128+vincentarnaud90@users.noreply.github.com> Date: Wed, 28 Aug 2019 16:02:56 +0200 Subject: [PATCH 3/4] Code formating Revert changes from codestyle and generate data in CommandTest with loops --- .../hadoop/rest/InitializationUtils.java | 5 +- .../serialization/bulk/DeleteBulkFactory.java | 52 +++++----- .../hadoop/serialization/CommandTest.java | 94 +++++++------------ 3 files changed, 60 insertions(+), 91 deletions(-) diff --git a/mr/src/main/java/org/elasticsearch/hadoop/rest/InitializationUtils.java b/mr/src/main/java/org/elasticsearch/hadoop/rest/InitializationUtils.java index f08a21d1a..08718dfe7 100644 --- a/mr/src/main/java/org/elasticsearch/hadoop/rest/InitializationUtils.java +++ b/mr/src/main/java/org/elasticsearch/hadoop/rest/InitializationUtils.java @@ -299,8 +299,7 @@ public static void validateSettingsForWriting(Settings settings) { throw new EsHadoopIllegalArgumentException("Cannot use TTL on index/update requests in ES 6.x and " + "above. Please remove the [" + ConfigurationOptions.ES_MAPPING_TTL + "] setting."); } - } - else { + } else { if (StringUtils.hasText(settings.getMappingTtl())) { LOG.warn("Setting [" + ConfigurationOptions.ES_MAPPING_TTL + "] is deprecated! Support for [ttl] on " + "indexing and update requests has been removed in ES 6.x and above!"); @@ -387,7 +386,7 @@ private static void doCheckIndexExistence(Settings settings, RestRepository clie settings.getResourceWrite(), ConfigurationOptions.ES_INDEX_AUTO_CREATE, settings.getIndexAutoCreate())); } } - + public static boolean setMetadataExtractorIfNotSet(Settings settings, Class clazz, Log log) { if (!StringUtils.hasText(settings.getMappingMetadataExtractorClassName())) { Log logger = (log != null ? log : LogFactory.getLog(clazz)); diff --git a/mr/src/main/java/org/elasticsearch/hadoop/serialization/bulk/DeleteBulkFactory.java b/mr/src/main/java/org/elasticsearch/hadoop/serialization/bulk/DeleteBulkFactory.java index 9c65f3f6d..fc0d9a049 100644 --- a/mr/src/main/java/org/elasticsearch/hadoop/serialization/bulk/DeleteBulkFactory.java +++ b/mr/src/main/java/org/elasticsearch/hadoop/serialization/bulk/DeleteBulkFactory.java @@ -18,7 +18,19 @@ */ package org.elasticsearch.hadoop.serialization.bulk; -import org.apache.hadoop.io.*; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.ByteWritable; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.MD5Hash; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.UTF8; +import org.apache.hadoop.io.VIntWritable; +import org.apache.hadoop.io.VLongWritable; import org.elasticsearch.hadoop.cfg.ConfigurationOptions; import org.elasticsearch.hadoop.cfg.Settings; import org.elasticsearch.hadoop.serialization.Generator; @@ -34,47 +46,35 @@ public static final class NoDataWriter implements ValueWriter { @Override public Result write(Object writable, Generator generator) { - ///delete doesn't require any content but it needs to extract metadata associated to a document + //delete doesn't require any content but it needs to extract metadata associated to a document if (writable == null || writable instanceof NullWritable) { generator.writeNull(); - } - else if (writable instanceof Text) { + } else if (writable instanceof Text) { Text text = (Text) writable; generator.writeUTF8String(text.getBytes(), 0, text.getLength()); - } - else if (writable instanceof UTF8) { + } else if (writable instanceof UTF8) { UTF8 utf8 = (UTF8) writable; generator.writeUTF8String(utf8.getBytes(), 0, utf8.getLength()); - } - else if (writable instanceof IntWritable) { + } else if (writable instanceof IntWritable) { generator.writeNumber(((IntWritable) writable).get()); - } - else if (writable instanceof LongWritable) { + } else if (writable instanceof LongWritable) { generator.writeNumber(((LongWritable) writable).get()); - } - else if (writable instanceof VLongWritable) { + } else if (writable instanceof VLongWritable) { generator.writeNumber(((VLongWritable) writable).get()); - } - else if (writable instanceof VIntWritable) { + } else if (writable instanceof VIntWritable) { generator.writeNumber(((VIntWritable) writable).get()); - } - else if (writable instanceof ByteWritable) { + } else if (writable instanceof ByteWritable) { generator.writeNumber(((ByteWritable) writable).get()); - } - else if (writable instanceof DoubleWritable) { + } else if (writable instanceof DoubleWritable) { generator.writeNumber(((DoubleWritable) writable).get()); - } - else if (writable instanceof FloatWritable) { + } else if (writable instanceof FloatWritable) { generator.writeNumber(((FloatWritable) writable).get()); - } - else if (writable instanceof BooleanWritable) { + } else if (writable instanceof BooleanWritable) { generator.writeBoolean(((BooleanWritable) writable).get()); - } - else if (writable instanceof BytesWritable) { + } else if (writable instanceof BytesWritable) { BytesWritable bw = (BytesWritable) writable; generator.writeBinary(bw.getBytes(), 0, bw.getLength()); - } - else if (writable instanceof MD5Hash) { + } else if (writable instanceof MD5Hash) { generator.writeString(writable.toString()); } return Result.SUCCESFUL(); diff --git a/mr/src/test/java/org/elasticsearch/hadoop/serialization/CommandTest.java b/mr/src/test/java/org/elasticsearch/hadoop/serialization/CommandTest.java index 311eb0251..abf5c18cf 100644 --- a/mr/src/test/java/org/elasticsearch/hadoop/serialization/CommandTest.java +++ b/mr/src/test/java/org/elasticsearch/hadoop/serialization/CommandTest.java @@ -18,8 +18,10 @@ */ package org.elasticsearch.hadoop.serialization; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.LinkedHashMap; import java.util.Map; @@ -65,56 +67,29 @@ public static Collection data() { throw new IllegalStateException("CommandTest needs new version updates."); } - return Arrays.asList(new Object[][]{ - {ConfigurationOptions.ES_OPERATION_INDEX, false, EsMajorVersion.V_1_X}, - {ConfigurationOptions.ES_OPERATION_CREATE, false, EsMajorVersion.V_1_X}, - {ConfigurationOptions.ES_OPERATION_UPDATE, false, EsMajorVersion.V_1_X}, - {ConfigurationOptions.ES_OPERATION_DELETE, false, EsMajorVersion.V_1_X}, - {ConfigurationOptions.ES_OPERATION_INDEX, true, EsMajorVersion.V_1_X}, - {ConfigurationOptions.ES_OPERATION_CREATE, true, EsMajorVersion.V_1_X}, - {ConfigurationOptions.ES_OPERATION_UPDATE, true, EsMajorVersion.V_1_X}, - {ConfigurationOptions.ES_OPERATION_DELETE, true, EsMajorVersion.V_1_X}, - {ConfigurationOptions.ES_OPERATION_INDEX, false, EsMajorVersion.V_2_X}, - {ConfigurationOptions.ES_OPERATION_CREATE, false, EsMajorVersion.V_2_X}, - {ConfigurationOptions.ES_OPERATION_UPDATE, false, EsMajorVersion.V_2_X}, - {ConfigurationOptions.ES_OPERATION_DELETE, false, EsMajorVersion.V_2_X}, - {ConfigurationOptions.ES_OPERATION_INDEX, true, EsMajorVersion.V_2_X}, - {ConfigurationOptions.ES_OPERATION_CREATE, true, EsMajorVersion.V_2_X}, - {ConfigurationOptions.ES_OPERATION_UPDATE, true, EsMajorVersion.V_2_X}, - {ConfigurationOptions.ES_OPERATION_DELETE, true, EsMajorVersion.V_2_X}, - {ConfigurationOptions.ES_OPERATION_INDEX, false, EsMajorVersion.V_5_X}, - {ConfigurationOptions.ES_OPERATION_CREATE, false, EsMajorVersion.V_5_X}, - {ConfigurationOptions.ES_OPERATION_UPDATE, false, EsMajorVersion.V_5_X}, - {ConfigurationOptions.ES_OPERATION_DELETE, false, EsMajorVersion.V_5_X}, - {ConfigurationOptions.ES_OPERATION_INDEX, true, EsMajorVersion.V_5_X}, - {ConfigurationOptions.ES_OPERATION_CREATE, true, EsMajorVersion.V_5_X}, - {ConfigurationOptions.ES_OPERATION_UPDATE, true, EsMajorVersion.V_5_X}, - {ConfigurationOptions.ES_OPERATION_DELETE, true, EsMajorVersion.V_5_X}, - {ConfigurationOptions.ES_OPERATION_INDEX, false, EsMajorVersion.V_6_X}, - {ConfigurationOptions.ES_OPERATION_CREATE, false, EsMajorVersion.V_6_X}, - {ConfigurationOptions.ES_OPERATION_UPDATE, false, EsMajorVersion.V_6_X}, - {ConfigurationOptions.ES_OPERATION_DELETE, false, EsMajorVersion.V_6_X}, - {ConfigurationOptions.ES_OPERATION_INDEX, true, EsMajorVersion.V_6_X}, - {ConfigurationOptions.ES_OPERATION_CREATE, true, EsMajorVersion.V_6_X}, - {ConfigurationOptions.ES_OPERATION_UPDATE, true, EsMajorVersion.V_6_X}, - {ConfigurationOptions.ES_OPERATION_DELETE, true, EsMajorVersion.V_6_X}, - {ConfigurationOptions.ES_OPERATION_INDEX, false, EsMajorVersion.V_7_X}, - {ConfigurationOptions.ES_OPERATION_CREATE, false, EsMajorVersion.V_7_X}, - {ConfigurationOptions.ES_OPERATION_UPDATE, false, EsMajorVersion.V_7_X}, - {ConfigurationOptions.ES_OPERATION_DELETE, false, EsMajorVersion.V_7_X}, - {ConfigurationOptions.ES_OPERATION_INDEX, true, EsMajorVersion.V_7_X}, - {ConfigurationOptions.ES_OPERATION_CREATE, true, EsMajorVersion.V_7_X}, - {ConfigurationOptions.ES_OPERATION_UPDATE, true, EsMajorVersion.V_7_X}, - {ConfigurationOptions.ES_OPERATION_DELETE, true, EsMajorVersion.V_7_X}, - {ConfigurationOptions.ES_OPERATION_INDEX, false, EsMajorVersion.V_8_X}, - {ConfigurationOptions.ES_OPERATION_CREATE, false, EsMajorVersion.V_8_X}, - {ConfigurationOptions.ES_OPERATION_UPDATE, false, EsMajorVersion.V_8_X}, - {ConfigurationOptions.ES_OPERATION_DELETE, false, EsMajorVersion.V_8_X}, - {ConfigurationOptions.ES_OPERATION_INDEX, true, EsMajorVersion.V_8_X}, - {ConfigurationOptions.ES_OPERATION_CREATE, true, EsMajorVersion.V_8_X}, - {ConfigurationOptions.ES_OPERATION_UPDATE, true, EsMajorVersion.V_8_X}, - {ConfigurationOptions.ES_OPERATION_DELETE, true, EsMajorVersion.V_8_X} - }); + Collection result = new ArrayList<>(); + + String[] operations = new String[]{ConfigurationOptions.ES_OPERATION_INDEX, + ConfigurationOptions.ES_OPERATION_CREATE, + ConfigurationOptions.ES_OPERATION_UPDATE, + ConfigurationOptions.ES_OPERATION_DELETE}; + boolean[] asJsons = new boolean[]{false, true}; + EsMajorVersion[] versions = new EsMajorVersion[]{EsMajorVersion.V_1_X, + EsMajorVersion.V_2_X, + EsMajorVersion.V_5_X, + EsMajorVersion.V_6_X, + EsMajorVersion.V_7_X, + EsMajorVersion.V_8_X}; + + for (EsMajorVersion version : versions) { + for (boolean asJson : asJsons) { + for (String operation : operations) { + result.add(new Object[]{ operation, asJson, version }); + } + } + } + + return result; } public CommandTest(String operation, boolean jsonInput, EsMajorVersion version) { @@ -131,8 +106,7 @@ public void prepare() { map.put("n", 1); map.put("s", "v"); data = map; - } - else { + } else { data = "{\"n\":1,\"s\":\"v\"}"; } } @@ -287,8 +261,7 @@ public void testIdPattern() throws Exception { Settings settings = settings(); if (version.onOrAfter(EsMajorVersion.V_8_X)) { settings.setResourceWrite("{n}"); - } - else { + } else { settings.setResourceWrite("foo/{n}"); } @@ -296,8 +269,7 @@ public void testIdPattern() throws Exception { String header; if (version.onOrAfter(EsMajorVersion.V_8_X)) { header = "{\"_index\":\"1\"" + (isUpdateOp() ? ",\"_id\":2" : "") + "}"; - } - else { + } else { header = "{\"_index\":\"foo\",\"_type\":\"1\"" + (isUpdateOp() ? ",\"_id\":2" : "") + "}"; } String result = "{\"" + operation + "\":" + header + "}" + map(); @@ -345,7 +317,7 @@ public void testUpdateOnlyInlineScript5X() throws Exception { create(set).write(data).copyTo(ba); String result = "{\"" + operation + "\":{\"_id\":2,\"_retry_on_conflict\":3}}\n" + - "{\"script\":{\"inline\":\"counter = 3\",\"lang\":\"groovy\"}}\n"; + "{\"script\":{\"inline\":\"counter = 3\",\"lang\":\"groovy\"}}\n"; assertEquals(result, ba.toString()); } @@ -475,7 +447,7 @@ public void testUpdateOnlyParamInlineScript5X() throws Exception { String result = "{\"" + operation + "\":{\"_id\":1}}\n" + - "{\"script\":{\"inline\":\"counter = param1; anothercounter = param2\",\"lang\":\"groovy\",\"params\":{\"param1\":1,\"param2\":1}}}\n"; + "{\"script\":{\"inline\":\"counter = param1; anothercounter = param2\",\"lang\":\"groovy\",\"params\":{\"param1\":1,\"param2\":1}}}\n"; assertEquals(result, ba.toString()); } @@ -553,8 +525,7 @@ private Settings settings() { set.setProperty(ConfigurationOptions.ES_INPUT_JSON, Boolean.toString(jsonInput)); if (isDeleteOP()) { InitializationUtils.setValueWriterIfNotSet(set, DeleteBulkFactory.NoDataWriter.class, null); - } - else { + } else { InitializationUtils.setValueWriterIfNotSet(set, JdkValueWriter.class, null); } InitializationUtils.setFieldExtractorIfNotSet(set, MapFieldExtractor.class, null); @@ -564,8 +535,7 @@ private Settings settings() { set.setProperty(ConfigurationOptions.ES_WRITE_OPERATION, operation); if (version.onOrAfter(EsMajorVersion.V_8_X)) { set.setResourceWrite("foo"); - } - else { + } else { set.setResourceWrite("foo/bar"); } if (isUpdateOp()) { From e7bfc3943aeb9ab057bd45003f304501449044ed Mon Sep 17 00:00:00 2001 From: Vincent Arnaud Date: Fri, 11 Oct 2019 05:47:19 +0200 Subject: [PATCH 4/4] implement a new BulkCommand to support delete operation --- .../elasticsearch/hadoop/cfg/Settings.java | 5 +- .../bulk/AbstractBulkFactory.java | 64 ++++---- .../serialization/bulk/DeleteBulkFactory.java | 42 +----- .../bulk/DeleteTemplatedBulk.java | 41 +++++ .../serialization/bulk/TemplatedBulk.java | 12 +- .../hadoop/serialization/CommandTest.java | 27 ++-- .../NoDataWriterTypeToJsonTest.java | 140 ------------------ 7 files changed, 90 insertions(+), 241 deletions(-) create mode 100644 mr/src/main/java/org/elasticsearch/hadoop/serialization/bulk/DeleteTemplatedBulk.java delete mode 100644 mr/src/test/java/org/elasticsearch/hadoop/serialization/NoDataWriterTypeToJsonTest.java diff --git a/mr/src/main/java/org/elasticsearch/hadoop/cfg/Settings.java b/mr/src/main/java/org/elasticsearch/hadoop/cfg/Settings.java index 317db80ae..6ffe2e030 100644 --- a/mr/src/main/java/org/elasticsearch/hadoop/cfg/Settings.java +++ b/mr/src/main/java/org/elasticsearch/hadoop/cfg/Settings.java @@ -221,10 +221,6 @@ public String getSerializerValueWriterClassName() { return getProperty(ES_SERIALIZATION_WRITER_VALUE_CLASS); } - public Settings setSerializerValueWriterClassName(String className) { - setProperty(ES_SERIALIZATION_WRITER_VALUE_CLASS, className); - return this; - } public String getSerializerBytesConverterClassName() { return getProperty(ES_SERIALIZATION_WRITER_BYTES_CLASS); @@ -770,3 +766,4 @@ public String save() { public abstract Properties asProperties(); } + diff --git a/mr/src/main/java/org/elasticsearch/hadoop/serialization/bulk/AbstractBulkFactory.java b/mr/src/main/java/org/elasticsearch/hadoop/serialization/bulk/AbstractBulkFactory.java index bb5be9445..054e540d3 100644 --- a/mr/src/main/java/org/elasticsearch/hadoop/serialization/bulk/AbstractBulkFactory.java +++ b/mr/src/main/java/org/elasticsearch/hadoop/serialization/bulk/AbstractBulkFactory.java @@ -18,13 +18,10 @@ */ package org.elasticsearch.hadoop.serialization.bulk; -import java.util.ArrayList; -import java.util.List; -import java.util.Date; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.elasticsearch.hadoop.EsHadoopIllegalArgumentException; +import org.elasticsearch.hadoop.cfg.ConfigurationOptions; import org.elasticsearch.hadoop.cfg.Settings; import org.elasticsearch.hadoop.rest.Resource; import org.elasticsearch.hadoop.serialization.builder.ValueWriter; @@ -44,6 +41,10 @@ import org.elasticsearch.hadoop.util.ObjectUtils; import org.elasticsearch.hadoop.util.StringUtils; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + public abstract class AbstractBulkFactory implements BulkFactory { private static Log log = LogFactory.getLog(AbstractBulkFactory.class); @@ -60,13 +61,13 @@ public abstract class AbstractBulkFactory implements BulkFactory { // used when specifying an index pattern private IndexExtractor indexExtractor; private FieldExtractor idExtractor, - typeExtractor, - parentExtractor, - routingExtractor, - versionExtractor, - ttlExtractor, - timestampExtractor, - paramsExtractor; + typeExtractor, + parentExtractor, + routingExtractor, + versionExtractor, + ttlExtractor, + timestampExtractor, + paramsExtractor; private final FieldExtractor versionTypeExtractor = new FieldExtractor() { @@ -139,14 +140,10 @@ void doWrite(Object value) { } pool.get().bytes(valueString); - } - - else if (value instanceof Date) { - String valueString = (value == null ? "null": Long.toString(((Date) value).getTime())); + } else if (value instanceof Date) { + String valueString = (value == null ? "null" : Long.toString(((Date) value).getTime())); pool.get().bytes(valueString); - } - - else if (value instanceof RawJson) { + } else if (value instanceof RawJson) { pool.get().bytes(((RawJson) value).json()); } // library specific type - use the value writer (a bit overkill but handles collections/arrays properly) @@ -249,17 +246,16 @@ private void initExtractorsFromSettings(final Settings settings) { ttlExtractor = jsonExtractors.ttl(); timestampExtractor = jsonExtractors.timestamp(); paramsExtractor = jsonExtractors.params(); - } - else { + } else { // init extractors (if needed) if (settings.getMappingId() != null) { settings.setProperty(ConstantFieldExtractor.PROPERTY, settings.getMappingId()); - idExtractor = ObjectUtils. instantiate(settings.getMappingIdExtractorClassName(), + idExtractor = ObjectUtils.instantiate(settings.getMappingIdExtractorClassName(), settings); } if (settings.getMappingParent() != null) { settings.setProperty(ConstantFieldExtractor.PROPERTY, settings.getMappingParent()); - parentExtractor = ObjectUtils. instantiate( + parentExtractor = ObjectUtils.instantiate( settings.getMappingParentExtractorClassName(), settings); } // Two different properties can satisfy the routing field extraction @@ -267,7 +263,7 @@ private void initExtractorsFromSettings(final Settings settings) { List routings = new ArrayList(2); if (settings.getMappingRouting() != null) { settings.setProperty(ConstantFieldExtractor.PROPERTY, settings.getMappingRouting()); - FieldExtractor extractor = ObjectUtils. instantiate( + FieldExtractor extractor = ObjectUtils.instantiate( settings.getMappingRoutingExtractorClassName(), settings); // If we specify a routing field, return NOT_FOUND if we ultimately cannot find one instead of skipping routingResponse = ChainedFieldExtractor.NoValueHandler.NOT_FOUND; @@ -286,22 +282,22 @@ private void initExtractorsFromSettings(final Settings settings) { if (settings.getMappingTtl() != null) { settings.setProperty(ConstantFieldExtractor.PROPERTY, settings.getMappingTtl()); - ttlExtractor = ObjectUtils. instantiate(settings.getMappingTtlExtractorClassName(), + ttlExtractor = ObjectUtils.instantiate(settings.getMappingTtlExtractorClassName(), settings); } if (settings.getMappingVersion() != null) { settings.setProperty(ConstantFieldExtractor.PROPERTY, settings.getMappingVersion()); - versionExtractor = ObjectUtils. instantiate( + versionExtractor = ObjectUtils.instantiate( settings.getMappingVersionExtractorClassName(), settings); } if (settings.getMappingTimestamp() != null) { settings.setProperty(ConstantFieldExtractor.PROPERTY, settings.getMappingTimestamp()); - timestampExtractor = ObjectUtils. instantiate( + timestampExtractor = ObjectUtils.instantiate( settings.getMappingTimestampExtractorClassName(), settings); } // create adapter - IndexExtractor iformat = ObjectUtils. instantiate(settings.getMappingIndexExtractorClassName(), settings); + IndexExtractor iformat = ObjectUtils.instantiate(settings.getMappingIndexExtractorClassName(), settings); iformat.compile(new Resource(settings, false).toString()); if (iformat.hasPattern()) { @@ -371,14 +367,15 @@ public BulkCommand createBulk() { if (!isStatic) { before.add(new DynamicHeaderRef()); after.add(new DynamicEndRef()); - } - else { + } else { writeObjectHeader(before); before = compact(before); writeObjectEnd(after); after = compact(after); } - + if (ConfigurationOptions.ES_OPERATION_DELETE.equals(getOperation())) { + return new DeleteTemplatedBulk(before, after); + } boolean isScriptUpdate = settings.hasUpdateScript(); // compress pieces if (jsonInput) { @@ -523,15 +520,13 @@ private List compact(List list) { stringAccumulator.setLength(0); } compacted.add(object); - } - else if (object instanceof FieldExtractor) { + } else if (object instanceof FieldExtractor) { if (stringAccumulator.length() > 0) { compacted.add(new BytesArray(stringAccumulator.toString())); stringAccumulator.setLength(0); } compacted.add(new FieldWriter((FieldExtractor) object)); - } - else { + } else { stringAccumulator.append(object.toString()); } } @@ -546,3 +541,4 @@ protected FieldExtractor getParamExtractor() { return paramsExtractor; } } + diff --git a/mr/src/main/java/org/elasticsearch/hadoop/serialization/bulk/DeleteBulkFactory.java b/mr/src/main/java/org/elasticsearch/hadoop/serialization/bulk/DeleteBulkFactory.java index fc0d9a049..8ce784542 100644 --- a/mr/src/main/java/org/elasticsearch/hadoop/serialization/bulk/DeleteBulkFactory.java +++ b/mr/src/main/java/org/elasticsearch/hadoop/serialization/bulk/DeleteBulkFactory.java @@ -42,48 +42,9 @@ public class DeleteBulkFactory extends AbstractBulkFactory { - public static final class NoDataWriter implements ValueWriter { - - @Override - public Result write(Object writable, Generator generator) { - //delete doesn't require any content but it needs to extract metadata associated to a document - if (writable == null || writable instanceof NullWritable) { - generator.writeNull(); - } else if (writable instanceof Text) { - Text text = (Text) writable; - generator.writeUTF8String(text.getBytes(), 0, text.getLength()); - } else if (writable instanceof UTF8) { - UTF8 utf8 = (UTF8) writable; - generator.writeUTF8String(utf8.getBytes(), 0, utf8.getLength()); - } else if (writable instanceof IntWritable) { - generator.writeNumber(((IntWritable) writable).get()); - } else if (writable instanceof LongWritable) { - generator.writeNumber(((LongWritable) writable).get()); - } else if (writable instanceof VLongWritable) { - generator.writeNumber(((VLongWritable) writable).get()); - } else if (writable instanceof VIntWritable) { - generator.writeNumber(((VIntWritable) writable).get()); - } else if (writable instanceof ByteWritable) { - generator.writeNumber(((ByteWritable) writable).get()); - } else if (writable instanceof DoubleWritable) { - generator.writeNumber(((DoubleWritable) writable).get()); - } else if (writable instanceof FloatWritable) { - generator.writeNumber(((FloatWritable) writable).get()); - } else if (writable instanceof BooleanWritable) { - generator.writeBoolean(((BooleanWritable) writable).get()); - } else if (writable instanceof BytesWritable) { - BytesWritable bw = (BytesWritable) writable; - generator.writeBinary(bw.getBytes(), 0, bw.getLength()); - } else if (writable instanceof MD5Hash) { - generator.writeString(writable.toString()); - } - return Result.SUCCESFUL(); - } - } public DeleteBulkFactory(Settings settings, MetadataExtractor metaExtractor, EsMajorVersion version) { - // we only want a specific serializer for this particular bulk factory - super(settings.copy().setSerializerValueWriterClassName(NoDataWriter.class.getName()), metaExtractor, version); + super(settings, metaExtractor, version); } @Override @@ -97,3 +58,4 @@ protected void writeObjectEnd(List list) { list.add(StringUtils.EMPTY); } } + diff --git a/mr/src/main/java/org/elasticsearch/hadoop/serialization/bulk/DeleteTemplatedBulk.java b/mr/src/main/java/org/elasticsearch/hadoop/serialization/bulk/DeleteTemplatedBulk.java new file mode 100644 index 000000000..69ca78d26 --- /dev/null +++ b/mr/src/main/java/org/elasticsearch/hadoop/serialization/bulk/DeleteTemplatedBulk.java @@ -0,0 +1,41 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.hadoop.serialization.bulk; + +import org.elasticsearch.hadoop.util.BytesRef; + +import java.util.Collection; + +public class DeleteTemplatedBulk extends TemplatedBulk { + + DeleteTemplatedBulk(Collection beforeObject, Collection afterObject) { + super(beforeObject, afterObject, null); + } + + @Override + public BytesRef write(Object object) { + ref.reset(); + scratchPad.reset(); + Object processed = preProcess(object, scratchPad); + writeTemplate(beforeObject, processed); + writeTemplate(afterObject, processed); + return ref; + } +} + diff --git a/mr/src/main/java/org/elasticsearch/hadoop/serialization/bulk/TemplatedBulk.java b/mr/src/main/java/org/elasticsearch/hadoop/serialization/bulk/TemplatedBulk.java index 26ed9f606..26ec13e13 100644 --- a/mr/src/main/java/org/elasticsearch/hadoop/serialization/bulk/TemplatedBulk.java +++ b/mr/src/main/java/org/elasticsearch/hadoop/serialization/bulk/TemplatedBulk.java @@ -32,11 +32,11 @@ class TemplatedBulk implements BulkCommand { - private final Collection beforeObject; - private final Collection afterObject; + protected final Collection beforeObject; + protected final Collection afterObject; - private BytesArray scratchPad = new BytesArray(1024); - private BytesRef ref = new BytesRef(); + protected BytesArray scratchPad = new BytesArray(1024); + protected BytesRef ref = new BytesRef(); private final ValueWriter valueWriter; @@ -71,7 +71,7 @@ protected void doWriteObject(Object object, BytesArray storage, ValueWriter w ContentBuilder.generate(bos, writer).value(object).flush().close(); } - private void writeTemplate(Collection template, Object object) { + protected void writeTemplate(Collection template, Object object) { for (Object item : template) { if (item instanceof BytesArray) { ref.add((BytesArray) item); @@ -89,4 +89,4 @@ else if (item instanceof DynamicContentRef) { } } } -} \ No newline at end of file +} diff --git a/mr/src/test/java/org/elasticsearch/hadoop/serialization/CommandTest.java b/mr/src/test/java/org/elasticsearch/hadoop/serialization/CommandTest.java index abf5c18cf..82ce166e1 100644 --- a/mr/src/test/java/org/elasticsearch/hadoop/serialization/CommandTest.java +++ b/mr/src/test/java/org/elasticsearch/hadoop/serialization/CommandTest.java @@ -18,13 +18,6 @@ */ package org.elasticsearch.hadoop.serialization; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.LinkedHashMap; -import java.util.Map; - import org.elasticsearch.hadoop.EsHadoopIllegalArgumentException; import org.elasticsearch.hadoop.cfg.ConfigurationOptions; import org.elasticsearch.hadoop.cfg.Settings; @@ -33,7 +26,6 @@ import org.elasticsearch.hadoop.serialization.builder.JdkValueWriter; import org.elasticsearch.hadoop.serialization.bulk.BulkCommand; import org.elasticsearch.hadoop.serialization.bulk.BulkCommands; -import org.elasticsearch.hadoop.serialization.bulk.DeleteBulkFactory; import org.elasticsearch.hadoop.util.BytesArray; import org.elasticsearch.hadoop.util.EsMajorVersion; import org.elasticsearch.hadoop.util.StringUtils; @@ -44,8 +36,12 @@ import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; -import static org.junit.Assert.assertEquals; +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.Map; +import static org.junit.Assert.assertEquals; import static org.junit.Assume.assumeFalse; import static org.junit.Assume.assumeTrue; @@ -84,7 +80,7 @@ public static Collection data() { for (EsMajorVersion version : versions) { for (boolean asJson : asJsons) { for (String operation : operations) { - result.add(new Object[]{ operation, asJson, version }); + result.add(new Object[]{operation, asJson, version}); } } } @@ -317,7 +313,7 @@ public void testUpdateOnlyInlineScript5X() throws Exception { create(set).write(data).copyTo(ba); String result = "{\"" + operation + "\":{\"_id\":2,\"_retry_on_conflict\":3}}\n" + - "{\"script\":{\"inline\":\"counter = 3\",\"lang\":\"groovy\"}}\n"; + "{\"script\":{\"inline\":\"counter = 3\",\"lang\":\"groovy\"}}\n"; assertEquals(result, ba.toString()); } @@ -447,7 +443,7 @@ public void testUpdateOnlyParamInlineScript5X() throws Exception { String result = "{\"" + operation + "\":{\"_id\":1}}\n" + - "{\"script\":{\"inline\":\"counter = param1; anothercounter = param2\",\"lang\":\"groovy\",\"params\":{\"param1\":1,\"param2\":1}}}\n"; + "{\"script\":{\"inline\":\"counter = param1; anothercounter = param2\",\"lang\":\"groovy\",\"params\":{\"param1\":1,\"param2\":1}}}\n"; assertEquals(result, ba.toString()); } @@ -523,11 +519,7 @@ private Settings settings() { set.setInternalVersion(version); set.setProperty(ConfigurationOptions.ES_INPUT_JSON, Boolean.toString(jsonInput)); - if (isDeleteOP()) { - InitializationUtils.setValueWriterIfNotSet(set, DeleteBulkFactory.NoDataWriter.class, null); - } else { - InitializationUtils.setValueWriterIfNotSet(set, JdkValueWriter.class, null); - } + InitializationUtils.setValueWriterIfNotSet(set, JdkValueWriter.class, null); InitializationUtils.setFieldExtractorIfNotSet(set, MapFieldExtractor.class, null); InitializationUtils.setBytesConverterIfNeeded(set, JdkBytesConverter.class, null); InitializationUtils.setUserProviderIfNotSet(set, HadoopUserProvider.class, null); @@ -577,3 +569,4 @@ private boolean isDeleteOP() { return ConfigurationOptions.ES_OPERATION_DELETE.equals(operation); } } + diff --git a/mr/src/test/java/org/elasticsearch/hadoop/serialization/NoDataWriterTypeToJsonTest.java b/mr/src/test/java/org/elasticsearch/hadoop/serialization/NoDataWriterTypeToJsonTest.java deleted file mode 100644 index fd796f779..000000000 --- a/mr/src/test/java/org/elasticsearch/hadoop/serialization/NoDataWriterTypeToJsonTest.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.hadoop.serialization; - -import org.apache.hadoop.io.*; -import org.elasticsearch.hadoop.mr.LinkedMapWritable; -import org.elasticsearch.hadoop.serialization.builder.ContentBuilder; -import org.elasticsearch.hadoop.serialization.bulk.DeleteBulkFactory; -import org.elasticsearch.hadoop.util.BytesArray; -import org.elasticsearch.hadoop.util.FastByteArrayOutputStream; -import org.junit.*; - -@SuppressWarnings("deprecation") -public class NoDataWriterTypeToJsonTest { - private static FastByteArrayOutputStream out; - - @BeforeClass - public static void beforeClass() { - out = new FastByteArrayOutputStream(); - } - - @Before - public void start() { - out.reset(); - } - - @After - public void after() { - out.reset(); - } - - @AfterClass - public static void afterClass() { - out = null; - } - - @Test - public void testNull() { - writableTypeToJson(null, new BytesArray("null")); - } - - @Test - public void testNullWritable() throws Exception { - writableTypeToJson(NullWritable.get(), new BytesArray("null")); - } - - @Test - public void testString() { - writableTypeToJson(new Text("some text"), new BytesArray("\"some text\"".getBytes())); - } - - @Test - public void testUTF8() { - writableTypeToJson(new UTF8("some utf8"), new BytesArray("\"some utf8\"".getBytes())); - } - - @Test - public void testInteger() { - writableTypeToJson(new IntWritable(Integer.MAX_VALUE), new BytesArray(Integer.toString(Integer.MAX_VALUE))); - } - - @Test - public void testLong() { - writableTypeToJson(new LongWritable(Long.MAX_VALUE), new BytesArray(Long.toString(Long.MAX_VALUE))); - } - - @Test - public void testVInteger() { - writableTypeToJson(new VIntWritable(Integer.MAX_VALUE), new BytesArray(Integer.toString(Integer.MAX_VALUE))); - } - - @Test - public void testVLong() { - writableTypeToJson(new VLongWritable(Long.MAX_VALUE), new BytesArray(Long.toString(Long.MAX_VALUE))); - } - - @Test - public void testDouble() { - writableTypeToJson(new DoubleWritable(Double.MAX_VALUE), new BytesArray(Double.toString(Double.MAX_VALUE))); - } - - @Test - public void testFloat() { - writableTypeToJson(new FloatWritable(Float.MAX_VALUE), new BytesArray(Float.toString(Float.MAX_VALUE))); - } - - @Test - public void testBoolean() { - writableTypeToJson(new BooleanWritable(Boolean.TRUE), new BytesArray("true")); - } - - @Test - public void testMD5Hash() { - writableTypeToJson(MD5Hash.digest("md5hash"), new BytesArray("\"f9d08276bc85d30d578e8883f3c7e843\"".getBytes())); - } - - @Test - public void testByte() { - writableTypeToJson(new ByteWritable(Byte.MAX_VALUE), new BytesArray(Byte.toString(Byte.MAX_VALUE))); - } - - @Test - public void testByteArray() { - writableTypeToJson(new BytesWritable("byte array".getBytes()), new BytesArray("\"Ynl0ZSBhcnJheQ==\"")); - } - - @Test - public void testArray() { - writableTypeToJson(new ArrayWritable(new String[]{"one", "two"}), new BytesArray("")); - } - - @Test - public void testMap() { - LinkedMapWritable map = new LinkedMapWritable(); - map.put(new Text("key"), new IntWritable(1)); - map.put(new BooleanWritable(Boolean.TRUE), new ArrayWritable(new String[]{"one", "two"})); - writableTypeToJson(map, new BytesArray("")); - } - - private void writableTypeToJson(Writable obj, BytesArray expected) { - ContentBuilder.generate(out, new DeleteBulkFactory.NoDataWriter()).value(obj).flush().close(); - Assert.assertEquals(expected.toString(), out.bytes().toString()); - } -}