Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add scripted upsert feature #1454

Merged
merged 8 commits into from
Jan 28, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@ public interface ConfigurationOptions {

String ES_UPDATE_SCRIPT_FILE = "es.update.script.file";
String ES_UPDATE_SCRIPT_INLINE = "es.update.script.inline";
String ES_UPDATE_SCRIPT_UPSERT = "es.update.script.upsert";
String ES_UPDATE_SCRIPT_UPSERT_DEFAULT = "false";
String ES_UPDATE_SCRIPT_STORED = "es.update.script.stored";
String ES_UPDATE_SCRIPT_LEGACY = "es.update.script";
String ES_UPDATE_SCRIPT_LANG = "es.update.script.lang";
Expand Down
9 changes: 9 additions & 0 deletions mr/src/main/java/org/elasticsearch/hadoop/cfg/Settings.java
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,10 @@ public String getUpdateScriptInline() {
return getLegacyProperty(ES_UPDATE_SCRIPT_LEGACY, ES_UPDATE_SCRIPT_INLINE, null);
}

public Boolean getUpdateScriptUpsert() {
return Booleans.parseBoolean(getProperty(ES_UPDATE_SCRIPT_UPSERT, ES_UPDATE_SCRIPT_UPSERT_DEFAULT));
}

public String getUpdateScriptFile() {
return getProperty(ES_UPDATE_SCRIPT_FILE);
}
Expand Down Expand Up @@ -409,6 +413,11 @@ public boolean hasUpdateScriptParamsJson() {
return hasUpdateScript() && StringUtils.hasText(getUpdateScriptParamsJson());
}

public boolean hasScriptUpsert() {
String op = getOperation();
return ConfigurationOptions.ES_OPERATION_UPSERT.equals(op) && getUpdateScriptUpsert();
}

private String getLegacyProperty(String legacyProperty, String newProperty, String defaultValue) {
String legacy = getProperty(legacyProperty);
if (StringUtils.hasText(legacy)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ protected Object preProcess(Object object, BytesArray storage) {
BytesArray ba = null;
if (ConfigurationOptions.ES_OPERATION_UPSERT.equals(settings.getOperation())) {
ba = storage;
if (settings.hasScriptUpsert()) {
jsonWriter.convert("{}", ba);
scratchPad.reset();
ba = scratchPad;
}
}
else {
scratchPad.reset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import org.elasticsearch.hadoop.cfg.ConfigurationOptions;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.serialization.builder.ValueWriter;
import org.elasticsearch.hadoop.serialization.json.JacksonJsonGenerator;
import org.elasticsearch.hadoop.util.BytesArray;
import org.elasticsearch.hadoop.util.FastByteArrayOutputStream;

class ScriptTemplateBulk extends TemplatedBulk {

Expand All @@ -38,7 +40,13 @@ class ScriptTemplateBulk extends TemplatedBulk {
@Override
protected void doWriteObject(Object object, BytesArray storage, ValueWriter<?> writer) {
if (ConfigurationOptions.ES_OPERATION_UPSERT.equals(settings.getOperation())) {
super.doWriteObject(object, storage, writer);
if (settings.hasScriptUpsert()) {
FastByteArrayOutputStream bos = new FastByteArrayOutputStream(storage);
JacksonJsonGenerator generator = new JacksonJsonGenerator(bos);
generator.writeBeginObject();
generator.writeEndObject();
generator.close();
} else super.doWriteObject(object, storage, writer);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class UpdateBulkFactory extends AbstractBulkFactory {
private final String SCRIPT_1X;
private final String SCRIPT_LANG_1X;

private final boolean HAS_SCRIPT, HAS_LANG;
private final boolean HAS_SCRIPT, HAS_LANG, HAS_SCRIPT_UPSERT;
private final boolean UPSERT;

public UpdateBulkFactory(Settings settings, MetadataExtractor metaExtractor, EsMajorVersion esMajorVersion) {
Expand All @@ -56,6 +56,7 @@ public UpdateBulkFactory(Settings settings, boolean upsert, MetadataExtractor me
RETRY_HEADER = getRequestParameterNames().retryOnConflict + RETRY_ON_FAILURE + "";

HAS_SCRIPT = settings.hasUpdateScript();
HAS_SCRIPT_UPSERT = settings.hasScriptUpsert();
HAS_LANG = StringUtils.hasText(settings.getUpdateScriptLang());

SCRIPT_LANG_5X = ",\"lang\":\"" + settings.getUpdateScriptLang() + "\"";
Expand Down Expand Up @@ -142,13 +143,17 @@ private void writeLegacyFormatting(List<Object> list, Object paramExtractor) {
* "params": ...,
* "lang": "...",
* "script": "...",
* "scripted_upsert":true,
* "upsert": {...}
* }
*/
if (HAS_LANG) {
list.add(SCRIPT_LANG_1X);
}
list.add(SCRIPT_1X);
if (HAS_SCRIPT_UPSERT) {
list.add(",\"scripted_upsert\": true");
}
if (UPSERT) {
list.add(",\"upsert\":");
}
Expand Down Expand Up @@ -181,6 +186,7 @@ private void writeStrictFormatting(List<Object> list, Object paramExtractor, Str
* "lang": "...",
* "params": ...,
* },
* "scripted_upsert":true,
* "upsert": {...}
* }
*/
Expand All @@ -193,6 +199,9 @@ private void writeStrictFormatting(List<Object> list, Object paramExtractor, Str
list.add(paramExtractor);
}
list.add("}");
if (HAS_SCRIPT_UPSERT) {
list.add(",\"scripted_upsert\": true");
}
if (UPSERT) {
list.add(",\"upsert\":");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public static Collection<Object[]> data() {
String[] operations = new String[]{ConfigurationOptions.ES_OPERATION_INDEX,
ConfigurationOptions.ES_OPERATION_CREATE,
ConfigurationOptions.ES_OPERATION_UPDATE,
ConfigurationOptions.ES_OPERATION_UPSERT,
ConfigurationOptions.ES_OPERATION_DELETE};
boolean[] asJsons = new boolean[]{false, true};
EsMajorVersion[] versions = new EsMajorVersion[]{EsMajorVersion.V_1_X,
Expand Down Expand Up @@ -110,6 +111,7 @@ public void prepare() {
@Test
public void testNoHeader() throws Exception {
assumeFalse(ConfigurationOptions.ES_OPERATION_UPDATE.equals(operation));
assumeFalse(ConfigurationOptions.ES_OPERATION_UPSERT.equals(operation));
assumeFalse(ConfigurationOptions.ES_OPERATION_DELETE.equals(operation));
create(settings()).write(data).copyTo(ba);
String result = prefix() + "}}" + map();
Expand All @@ -120,6 +122,7 @@ public void testNoHeader() throws Exception {
// check user friendliness and escape the string if needed
public void testConstantId() throws Exception {
assumeFalse(isDeleteOP() && jsonInput);
assumeFalse(ConfigurationOptions.ES_OPERATION_UPSERT.equals(operation));
Settings settings = settings();
noId = true;
settings.setProperty(ConfigurationOptions.ES_MAPPING_ID, "<foobar>");
Expand All @@ -133,6 +136,7 @@ public void testConstantId() throws Exception {
@Test
public void testParent() throws Exception {
assumeTrue(version.onOrBefore(EsMajorVersion.V_6_X));
assumeFalse(ConfigurationOptions.ES_OPERATION_UPSERT.equals(operation));
assumeFalse(isDeleteOP() && jsonInput);
Settings settings = settings();
settings.setProperty(ConfigurationOptions.ES_MAPPING_PARENT, "<5>");
Expand All @@ -145,6 +149,7 @@ public void testParent() throws Exception {
@Test
public void testParent7X() throws Exception {
assumeTrue(version.onOrAfter(EsMajorVersion.V_7_X));
assumeFalse(ConfigurationOptions.ES_OPERATION_UPSERT.equals(operation));
assumeFalse(isDeleteOP() && jsonInput);
Settings settings = settings();
settings.setProperty(ConfigurationOptions.ES_MAPPING_PARENT, "<5>");
Expand All @@ -157,6 +162,7 @@ public void testParent7X() throws Exception {
@Test
public void testVersion() throws Exception {
assumeTrue(version.onOrBefore(EsMajorVersion.V_6_X));
assumeFalse(ConfigurationOptions.ES_OPERATION_UPSERT.equals(operation));
assumeFalse(isDeleteOP() && jsonInput);
Settings settings = settings();
settings.setProperty(ConfigurationOptions.ES_MAPPING_VERSION, "<3>");
Expand All @@ -169,6 +175,7 @@ public void testVersion() throws Exception {
@Test
public void testVersion7X() throws Exception {
assumeTrue(version.onOrAfter(EsMajorVersion.V_7_X));
assumeFalse(ConfigurationOptions.ES_OPERATION_UPSERT.equals(operation));
assumeFalse(isDeleteOP() && jsonInput);
Settings settings = settings();
settings.setProperty(ConfigurationOptions.ES_MAPPING_VERSION, "<3>");
Expand All @@ -181,6 +188,7 @@ public void testVersion7X() throws Exception {
@Test
public void testTtl() throws Exception {
assumeFalse(isDeleteOP() && jsonInput);
assumeFalse(ConfigurationOptions.ES_OPERATION_UPSERT.equals(operation));
Settings settings = settings();
settings.setProperty(ConfigurationOptions.ES_MAPPING_TTL, "<2>");

Expand All @@ -192,6 +200,7 @@ public void testTtl() throws Exception {
@Test
public void testTimestamp() throws Exception {
assumeFalse(isDeleteOP() && jsonInput);
assumeFalse(ConfigurationOptions.ES_OPERATION_UPSERT.equals(operation));
Settings settings = settings();
settings.setProperty(ConfigurationOptions.ES_MAPPING_TIMESTAMP, "<3>");
create(settings).write(data).copyTo(ba);
Expand All @@ -202,6 +211,7 @@ public void testTimestamp() throws Exception {
@Test
public void testRouting() throws Exception {
assumeTrue(version.onOrBefore(EsMajorVersion.V_6_X));
assumeFalse(ConfigurationOptions.ES_OPERATION_UPSERT.equals(operation));
assumeFalse(isDeleteOP() && jsonInput);
Settings settings = settings();
settings.setProperty(ConfigurationOptions.ES_MAPPING_ROUTING, "<4>");
Expand All @@ -214,6 +224,7 @@ public void testRouting() throws Exception {
@Test
public void testRouting7X() throws Exception {
assumeTrue(version.onOrAfter(EsMajorVersion.V_7_X));
assumeFalse(ConfigurationOptions.ES_OPERATION_UPSERT.equals(operation));
assumeFalse(isDeleteOP() && jsonInput);
Settings settings = settings();
settings.setProperty(ConfigurationOptions.ES_MAPPING_ROUTING, "<4>");
Expand All @@ -226,6 +237,7 @@ public void testRouting7X() throws Exception {
@Test
public void testAll() throws Exception {
assumeTrue(version.onOrBefore(EsMajorVersion.V_6_X));
assumeFalse(ConfigurationOptions.ES_OPERATION_UPSERT.equals(operation));
assumeFalse(isDeleteOP() && jsonInput);
Settings settings = settings();
settings.setProperty(ConfigurationOptions.ES_MAPPING_ID, "n");
Expand All @@ -240,6 +252,7 @@ public void testAll() throws Exception {
@Test
public void testAll7X() throws Exception {
assumeTrue(version.onOrAfter(EsMajorVersion.V_7_X));
assumeFalse(ConfigurationOptions.ES_OPERATION_UPSERT.equals(operation));
assumeFalse(isDeleteOP() && jsonInput);
Settings settings = settings();
settings.setProperty(ConfigurationOptions.ES_MAPPING_ID, "n");
Expand All @@ -254,6 +267,7 @@ public void testAll7X() throws Exception {
@Test
public void testIdPattern() throws Exception {
assumeFalse(isDeleteOP() && jsonInput);
assumeFalse(ConfigurationOptions.ES_OPERATION_UPSERT.equals(operation));
Settings settings = settings();
if (version.onOrAfter(EsMajorVersion.V_8_X)) {
settings.setResourceWrite("{n}");
Expand Down Expand Up @@ -468,6 +482,127 @@ public void testUpdateOnlyParamInlineScript6X() throws Exception {
assertEquals(result, ba.toString());
}

@Test
public void testUpsertParamInlineScript1X() throws Exception {
assumeTrue(ConfigurationOptions.ES_OPERATION_UPSERT.equals(operation));
assumeTrue(version.onOrBefore(EsMajorVersion.V_1_X));
Settings set = settings();

set.setProperty(ConfigurationOptions.ES_UPDATE_SCRIPT_INLINE, "counter = param1; anothercounter = param2");
set.setProperty(ConfigurationOptions.ES_UPDATE_SCRIPT_LANG, "groovy");
set.setProperty(ConfigurationOptions.ES_UPDATE_SCRIPT_PARAMS, " param1:<1>, param2:n ");

create(set).write(data).copyTo(ba);

String result =
"{\"update\":{\"_id\":3}}\n" +
"{\"params\":{\"param1\":1,\"param2\":1},\"lang\":\"groovy\",\"script\":\"counter = param1; anothercounter = param2\",\"upsert\":{\"n\":1,\"s\":\"v\"}}\n";

assertEquals(result, ba.toString());
}

@Test
public void testUpsertParamInlineScript5X() throws Exception {
assumeTrue(ConfigurationOptions.ES_OPERATION_UPSERT.equals(operation));
assumeTrue(version.after(EsMajorVersion.V_1_X));
assumeTrue(version.before(EsMajorVersion.V_6_X));
Settings set = settings();

set.setProperty(ConfigurationOptions.ES_UPDATE_SCRIPT_INLINE, "counter = param1; anothercounter = param2");
set.setProperty(ConfigurationOptions.ES_UPDATE_SCRIPT_LANG, "groovy");
set.setProperty(ConfigurationOptions.ES_UPDATE_SCRIPT_PARAMS, " param1:<1>, param2:n ");

create(set).write(data).copyTo(ba);

String result =
"{\"update\":{\"_id\":3}}\n" +
"{\"script\":{\"inline\":\"counter = param1; anothercounter = param2\",\"lang\":\"groovy\",\"params\":{\"param1\":1,\"param2\":1}},\"upsert\":{\"n\":1,\"s\":\"v\"}}\n";

assertEquals(result, ba.toString());
}

@Test
public void testUpsertParamInlineScript6X() throws Exception {
assumeTrue(ConfigurationOptions.ES_OPERATION_UPSERT.equals(operation));
assumeTrue(version.onOrAfter(EsMajorVersion.V_6_X));
Settings set = settings();

set.setProperty(ConfigurationOptions.ES_UPDATE_SCRIPT_INLINE, "counter = param1; anothercounter = param2");
set.setProperty(ConfigurationOptions.ES_UPDATE_SCRIPT_LANG, "groovy");
set.setProperty(ConfigurationOptions.ES_UPDATE_SCRIPT_PARAMS, " param1:<1>, param2:n ");

create(set).write(data).copyTo(ba);

String result =
"{\"update\":{\"_id\":3}}\n" +
"{\"script\":{\"source\":\"counter = param1; anothercounter = param2\",\"lang\":\"groovy\",\"params\":{\"param1\":1,\"param2\":1}},\"upsert\":{\"n\":1,\"s\":\"v\"}}\n";

assertEquals(result, ba.toString());
}

@Test
public void testScriptedUpsertParamInlineScript1X() throws Exception {
assumeTrue(ConfigurationOptions.ES_OPERATION_UPSERT.equals(operation));
assumeTrue(version.onOrBefore(EsMajorVersion.V_1_X));
Settings set = settings();

set.setProperty(ConfigurationOptions.ES_MAPPING_ID, "n");
set.setProperty(ConfigurationOptions.ES_UPDATE_SCRIPT_UPSERT, "true");
set.setProperty(ConfigurationOptions.ES_UPDATE_SCRIPT_INLINE, "counter = param1; anothercounter = param2");
set.setProperty(ConfigurationOptions.ES_UPDATE_SCRIPT_LANG, "groovy");
set.setProperty(ConfigurationOptions.ES_UPDATE_SCRIPT_PARAMS, " param1:<1>, param2:n ");

create(set).write(data).copyTo(ba);

String result =
"{\"update\":{\"_id\":1}}\n" +
"{\"params\":{\"param1\":1,\"param2\":1},\"lang\":\"groovy\",\"script\":\"counter = param1; anothercounter = param2\",\"scripted_upsert\": true,\"upsert\":{}}\n";

assertEquals(result, ba.toString());
}

@Test
public void testScriptedUpsertParamInlineScript5X() throws Exception {
assumeTrue(ConfigurationOptions.ES_OPERATION_UPSERT.equals(operation));
assumeTrue(version.after(EsMajorVersion.V_1_X));
assumeTrue(version.before(EsMajorVersion.V_6_X));
Settings set = settings();

set.setProperty(ConfigurationOptions.ES_MAPPING_ID, "n");
set.setProperty(ConfigurationOptions.ES_UPDATE_SCRIPT_UPSERT, "true");
set.setProperty(ConfigurationOptions.ES_UPDATE_SCRIPT_INLINE, "counter = param1; anothercounter = param2");
set.setProperty(ConfigurationOptions.ES_UPDATE_SCRIPT_LANG, "groovy");
set.setProperty(ConfigurationOptions.ES_UPDATE_SCRIPT_PARAMS, " param1:<1>, param2:n ");

create(set).write(data).copyTo(ba);

String result =
"{\"update\":{\"_id\":1}}\n" +
"{\"script\":{\"inline\":\"counter = param1; anothercounter = param2\",\"lang\":\"groovy\",\"params\":{\"param1\":1,\"param2\":1}},\"scripted_upsert\": true,\"upsert\":{}}\n";

assertEquals(result, ba.toString());
}
@Test
public void testScriptedUpsertParamInlineScript6X() throws Exception {
assumeTrue(ConfigurationOptions.ES_OPERATION_UPSERT.equals(operation));
assumeTrue(version.onOrAfter(EsMajorVersion.V_6_X));
Settings set = settings();

set.setProperty(ConfigurationOptions.ES_MAPPING_ID, "n");
set.setProperty(ConfigurationOptions.ES_UPDATE_SCRIPT_UPSERT, "true");
set.setProperty(ConfigurationOptions.ES_UPDATE_SCRIPT_INLINE, "counter = param1; anothercounter = param2");
set.setProperty(ConfigurationOptions.ES_UPDATE_SCRIPT_LANG, "groovy");
set.setProperty(ConfigurationOptions.ES_UPDATE_SCRIPT_PARAMS, " param1:<1>, param2:n ");

create(set).write(data).copyTo(ba);

String result =
"{\"update\":{\"_id\":1}}\n" +
"{\"script\":{\"source\":\"counter = param1; anothercounter = param2\",\"lang\":\"groovy\",\"params\":{\"param1\":1,\"param2\":1}},\"scripted_upsert\": true,\"upsert\":{}}\n";

assertEquals(result, ba.toString());
}

@Test
public void testUpdateOnlyParamFileScript1X() throws Exception {
assumeTrue(ConfigurationOptions.ES_OPERATION_UPDATE.equals(operation));
Expand Down Expand Up @@ -533,6 +668,9 @@ private Settings settings() {
if (isUpdateOp()) {
set.setProperty(ConfigurationOptions.ES_MAPPING_ID, "<2>");
}
if (isUpsertOp()) {
set.setProperty(ConfigurationOptions.ES_MAPPING_ID, "<3>");
}
return set;
}

Expand Down Expand Up @@ -565,6 +703,10 @@ private boolean isUpdateOp() {
return ConfigurationOptions.ES_OPERATION_UPDATE.equals(operation);
}

private boolean isUpsertOp() {
return ConfigurationOptions.ES_OPERATION_UPSERT.equals(operation);
}

private boolean isDeleteOP() {
return ConfigurationOptions.ES_OPERATION_DELETE.equals(operation);
}
Expand Down
Loading