Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add scripted upsert feature #1454

Merged
merged 8 commits into from
Jan 28, 2022

Conversation

lucebert
Copy link
Contributor

@lucebert lucebert commented Mar 30, 2020

resolves #1449

Thank you for submitting a pull request!

Please make sure you have signed our Contributor License Agreement (CLA).
We are not asking you to assign copyright to us, but to give us the right to distribute your code without restriction. We ask this of all contributors in order to assure our users of the origin and continuing existence of the code.
You only need to sign the CLA once.

@cla-checker-service
Copy link

cla-checker-service bot commented Mar 30, 2020

💚 CLA has been signed

@masseyke
Copy link
Member

masseyke commented Dec 2, 2021

@elasticmachine update branch

@masseyke
Copy link
Member

masseyke commented Jan 5, 2022

I think there might be some additional problems to work out with scripted upserts and es-spark. When I try this (using the scripted upsert example from https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html#scripted_upsert)

    val update_params = "count: 4"
    val update_script = "if ( ctx.op == 'create' ) {ctx._source.counter = params.new_count} else {ctx._source.counter += params.new_count}"
    val es_conf = Map("es.mapping.id" -> "id", "es.mapping.exclude" -> "id", "es.write.operation" -> "upsert", "es.update.script.params" -> update_params, "es.update.script.upsert" -> "true", "es.update.script.inline" -> update_script)
    val sqlContext = new SQLContext(sc)
    val data = Seq(Row("1", 3))
    val rdd: RDD[Row] = sc.parallelize(data)
    val schema = new StructType().add("id", StringType, nullable = false).add("count", IntegerType, nullable = false)
    val df = sqlContext.createDataFrame(rdd, schema)
    df.write.format("es").options(es_conf).save("test")

I get

org.elasticsearch.hadoop.serialization.EsHadoopSerializationException: org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: [DataFrameFieldExtractor for field [[4]]] cannot extract value from entity [class java.lang.String] | instance [([1,3],StructType(StructField(id,StringType,false), StructField(count,IntegerType,false)))]
	at org.elasticsearch.hadoop.serialization.bulk.BulkEntryWriter.writeBulkEntry(BulkEntryWriter.java:136) ~[elasticsearch-hadoop-mr-8.1.0-SNAPSHOT.jar:8.1.0-SNAPSHOT]
	at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:170) ~[elasticsearch-hadoop-mr-8.1.0-SNAPSHOT.jar:8.1.0-SNAPSHOT]
	at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:83) ~[elasticsearch-spark_2.12-8.1.0-SNAPSHOT-spark30scala212.jar:8.1.0-SNAPSHOT]
	at org.elasticsearch.spark.sql.EsSparkSQL$.$anonfun$saveToEs$1(EsSparkSQL.scala:101) ~[main/:?]
	at org.elasticsearch.spark.sql.EsSparkSQL$.$anonfun$saveToEs$1$adapted(EsSparkSQL.scala:101) ~[main/:?]
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) ~[spark-core_2.12-3.2.0.jar:3.2.0]
	at org.apache.spark.scheduler.Task.run(Task.scala:131) ~[spark-core_2.12-3.2.0.jar:3.2.0]
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) ~[spark-core_2.12-3.2.0.jar:3.2.0]
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462) ~[spark-core_2.12-3.2.0.jar:3.2.0]
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) [spark-core_2.12-3.2.0.jar:3.2.0]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_292]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_292]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292]
Caused by: org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: [DataFrameFieldExtractor for field [[4]]] cannot extract value from entity [class java.lang.String] | instance [([1,3],StructType(StructField(id,StringType,false), StructField(count,IntegerType,false)))]
	at org.elasticsearch.hadoop.serialization.bulk.AbstractBulkFactory$FieldWriter.write(AbstractBulkFactory.java:111) ~[elasticsearch-hadoop-mr-8.1.0-SNAPSHOT.jar:8.1.0-SNAPSHOT]
	at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.writeTemplate(TemplatedBulk.java:80) ~[elasticsearch-hadoop-mr-8.1.0-SNAPSHOT.jar:8.1.0-SNAPSHOT]
	at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.write(TemplatedBulk.java:56) ~[elasticsearch-hadoop-mr-8.1.0-SNAPSHOT.jar:8.1.0-SNAPSHOT]
	at org.elasticsearch.hadoop.serialization.bulk.BulkEntryWriter.writeBulkEntry(BulkEntryWriter.java:68) ~[elasticsearch-hadoop-mr-8.1.0-SNAPSHOT.jar:8.1.0-SNAPSHOT]
	... 12 more

It looks like es.update.script.params does not like having 4 there and instead is expecting a field name. Just to get past that I tried changing the value of es.update.script.params:

    val update_params = "new_count: count"
    val update_script = "if ( ctx.op == 'create' ) {ctx._source.counter = params.new_count} else {ctx._source.counter += params.new_count}"
    val es_conf = Map("es.mapping.id" -> "id", "es.mapping.exclude" -> "id", "es.write.operation" -> "upsert", "es.update.script.params" -> update_params, "es.update.script.upsert" -> "true", "es.update.script.inline" -> update_script)
    val sqlContext = new SQLContext(sc)
    val data = Seq(Row("1", 3))
    val rdd: RDD[Row] = sc.parallelize(data)
    val schema = new StructType().add("id", StringType, nullable = false).add("count", IntegerType, nullable = false)
    val df = sqlContext.createDataFrame(rdd, schema)
    df.write.format("es").options(es_conf).save("test")

But with that I get:

	at org.elasticsearch.hadoop.serialization.bulk.BulkEntryWriter.writeBulkEntry(BulkEntryWriter.java:136) ~[elasticsearch-hadoop-mr-8.1.0-SNAPSHOT.jar:8.1.0-SNAPSHOT]
	at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:170) ~[elasticsearch-hadoop-mr-8.1.0-SNAPSHOT.jar:8.1.0-SNAPSHOT]
	at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:83) ~[elasticsearch-spark_2.12-8.1.0-SNAPSHOT-spark30scala212.jar:8.1.0-SNAPSHOT]
	at org.elasticsearch.spark.sql.EsSparkSQL$.$anonfun$saveToEs$1(EsSparkSQL.scala:101) ~[main/:?]
	at org.elasticsearch.spark.sql.EsSparkSQL$.$anonfun$saveToEs$1$adapted(EsSparkSQL.scala:101) ~[main/:?]
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) ~[spark-core_2.12-3.2.0.jar:3.2.0]
	at org.apache.spark.scheduler.Task.run(Task.scala:131) ~[spark-core_2.12-3.2.0.jar:3.2.0]
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) ~[spark-core_2.12-3.2.0.jar:3.2.0]
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462) ~[spark-core_2.12-3.2.0.jar:3.2.0]
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) [spark-core_2.12-3.2.0.jar:3.2.0]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_292]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_292]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292]
Caused by: java.lang.ClassCastException: java.util.Collections$EmptyMap cannot be cast to scala.Tuple2
	at org.elasticsearch.spark.sql.DataFrameValueWriter.write(DataFrameValueWriter.scala:53) ~[main/:?]
	at org.elasticsearch.hadoop.serialization.builder.ContentBuilder.value(ContentBuilder.java:53) ~[elasticsearch-hadoop-mr-8.1.0-SNAPSHOT.jar:8.1.0-SNAPSHOT]
	at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.doWriteObject(TemplatedBulk.java:71) ~[elasticsearch-hadoop-mr-8.1.0-SNAPSHOT.jar:8.1.0-SNAPSHOT]
	at org.elasticsearch.hadoop.serialization.bulk.ScriptTemplateBulk.doWriteObject(ScriptTemplateBulk.java:43) ~[elasticsearch-hadoop-mr-8.1.0-SNAPSHOT.jar:8.1.0-SNAPSHOT]
	at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.write(TemplatedBulk.java:58) ~[elasticsearch-hadoop-mr-8.1.0-SNAPSHOT.jar:8.1.0-SNAPSHOT]
	at org.elasticsearch.hadoop.serialization.bulk.BulkEntryWriter.writeBulkEntry(BulkEntryWriter.java:68) ~[elasticsearch-hadoop-mr-8.1.0-SNAPSHOT.jar:8.1.0-SNAPSHOT]
	... 12 more

It doesn't seem to like the empty map being passed in.
It's possible I'm just doing something wrong in the code above. Do you have any examples that are working? It would be good to get one or more into https://github.com/elastic/elasticsearch-hadoop/blob/master/spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala (that's where I've put these to see them fail).

@masseyke
Copy link
Member

masseyke commented Jan 6, 2022

Oh the first one was just a mistake on my part -- there is special syntax for constants: https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html#cfg-update. After using the correct syntax I run into the same Caused by: java.lang.ClassCastException: java.util.Collections$EmptyMap cannot be cast to scala.Tuple2 error with both examples.

@@ -38,7 +39,9 @@
@Override
protected void doWriteObject(Object object, BytesArray storage, ValueWriter<?> writer) {
if (ConfigurationOptions.ES_OPERATION_UPSERT.equals(settings.getOperation())) {
super.doWriteObject(object, storage, writer);
if (settings.hasScriptUpsert()) {
super.doWriteObject(Collections.emptyMap(), storage, writer);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DataFrameValueWriter::write can't handle this being a Collection rather than a Tuple2. But if I change this line to

storage.add("{}");

then it seems to work (at least for my one test case). There might be a cleaner way to do that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe something like this instead:

                FastByteArrayOutputStream bos = new FastByteArrayOutputStream(storage);
                JacksonJsonGenerator generator = new JacksonJsonGenerator(bos);
                generator.writeBeginObject();
                generator.writeEndObject();
                generator.close();

@masseyke
Copy link
Member

masseyke commented Jan 6, 2022

@lucebert I branched off of your branch and put some changes (including a new integration test) here: https://github.com/masseyke/elasticsearch-hadoop/tree/feature/add-script-upsert-2. If you have a chance, I'd appreciate hearing if that works for your case.

@lucebert lucebert changed the title add script upsert feature add scripted upsert feature Jan 7, 2022
@lucebert
Copy link
Contributor Author

lucebert commented Jan 7, 2022

@masseyke Unfortunatly, I no longer have access to the case because I work now for another client

However, in my memory it was something quite similar to this test:

  @Test
  def testScriptedUpsert(): Unit = {
    val testIndex = "scripted_upsert_test"
    val updateParams =
      """
      count: count,
      anotherParam: anotherParam
      """
    val updateScript = "if(params.count == 3) { ctx._source.count = params.count; ctx._source.anotherParam = params.anotherParam; } else { ctx.op = 'noop' }"
    val conf = Map("es.mapping.id" -> "id", "es.mapping.exclude" -> "id", "es.write.operation" -> "upsert", "es.update.script.params" ->
      updateParams, "es.update.script.upsert" -> "true", "es.update.script.inline" -> updateScript)
    val data = Seq(Row("myId", 3, "other"))
    val rdd: RDD[Row] = sc.parallelize(data)
    val schema = new StructType()
      .add("id", StringType, nullable = false)
      .add("count", IntegerType, nullable = false)
      .add("anotherParam", StringType, nullable = false)
    val df = sqc.createDataFrame(rdd, schema)
    df.write.format("es").options(conf).mode(SaveMode.Append).save(testIndex)

    val reader = sqc.read.format("es")
    val readerDf = reader.load(testIndex)
    val resultCount = readerDf.select("count").first().get(0)
    val resultOther = readerDf.select("anotherParam").first().get(0)
    assertEquals(3l, resultCount)
    assertEquals("other", resultOther)
  }

I tried with your branch and it works well

@masseyke masseyke requested a review from jbaiera January 14, 2022 22:08
Copy link
Member

@masseyke masseyke left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me now (I merged in my branch).

Copy link
Member

@jbaiera jbaiera left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@masseyke masseyke merged commit 94eafed into elastic:master Jan 28, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature Request] scripted_upsert not supported
3 participants