Skip to content

Commit

Permalink
Fix script upsert on bulk requests (#1974)
Browse files Browse the repository at this point in the history
According to the [documentation](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html#scripted_upsert), it's required to add the `scripted_upsert=true` for running a upsert with script.
If the flag is not set, the document is added, but without running the script.

I've added a test that replicates the issue and then fixed it.
  • Loading branch information
dsgrillo authored Oct 25, 2021
1 parent 702427b commit e479379
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
* Added `Elastica\Aggregation\NormalizeAggregation` [#1956](https://github.com/ruflin/Elastica/pull/1956)
* Added `Elastica\Suggest\Phrase::addDirectGenerator` to align with ES specification [#1964](https://github.com/ruflin/Elastica/pull/1964)
* Added support for `psr/log` 2.0 and 3.0 [#1971](https://github.com/ruflin/Elastica/pull/1971)
* Added support for scripted upsert on bulk requests [#1974](https://github.com/ruflin/Elastica/pull/1974)
* Added new optional 'case_insensitive' option to `Elastica\Query\Wildcard` [#1894](https://github.com/ruflin/Elastica/pull/1894)
* Added `Elastica\Result::getSort()` fetching the "sort" property of results [#1979](https://github.com/ruflin/Elastica/pull/1979)
* Added exposure of Point-In-Time ID for search responses in `Elastica\ResultSet::getPointInTimeId()` [#1991](https://github.com/ruflin/Elastica/pull/1991)
Expand Down
4 changes: 4 additions & 0 deletions src/Bulk/Action/UpdateDocument.php
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ public function setScript(AbstractScript $script): AbstractDocument

if (!empty($upsert)) {
$source['upsert'] = $upsert;

if ($script->getScriptedUpsert()) {
$source['scripted_upsert'] = true;
}
}
}

Expand Down
15 changes: 15 additions & 0 deletions src/Script/AbstractScript.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ abstract class AbstractScript extends AbstractUpdateAction
*/
private $_lang;

/**
* @var bool|null
*/
private $scriptedUpsert;

/**
* @param string|null $lang Script language, see constants
* @param string|null $documentId Document ID the script action should be performed on (only relevant in update context)
Expand Down Expand Up @@ -136,4 +141,14 @@ private static function _createFromArray(array $data)

throw new InvalidException('Failed to create script. Invalid data passed.');
}

public function setScriptedUpsert(bool $scriptedUpsert): void
{
$this->scriptedUpsert = $scriptedUpsert;
}

public function getScriptedUpsert(): ?bool
{
return $this->scriptedUpsert;
}
}
66 changes: 66 additions & 0 deletions tests/BulkTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,72 @@ public function testUpsert(): void
$this->assertEquals('Maradona', $docData['name']);
}

/**
* @group functional
*/
public function testScriptUpsert(): void
{
$index = $this->_createIndex();
$client = $index->getClient();

$bulk = new Bulk($client);
$bulk->setIndex($index);

$id = 1;
$field = 123456789;
$subField = ['field' => 'a'];
$subField2 = ['field_2' => 'b'];

$defaultData = [
'field' => null,
'sub_field' => [],
'sub_field_2' => [],
];

//insert doc and update field
$script = new Script('ctx._source.field = params.field', ['field' => $field]);
$script->setUpsert($defaultData);
$script->setId($id);
$script->setScriptedUpsert(true);

$action = AbstractDocument::create($script, Action::OP_TYPE_UPDATE);
$bulk->addAction($action);

//update sub_field
$script = new Script('if ( !ctx._source.sub_field.contains(params) ) ctx._source.sub_field.add(params)', $subField);
$script->setUpsert($defaultData);
$script->setId($id);
$script->setScriptedUpsert(true);

$action = AbstractDocument::create($script, Action::OP_TYPE_UPDATE);
$bulk->addAction($action);

//update sub_field_2
$script = new Script('if ( !ctx._source.sub_field_2.contains(params) ) ctx._source.sub_field_2.add(params)', $subField2);
$script->setUpsert($defaultData);
$script->setId($id);
$script->setScriptedUpsert(true);

$action = AbstractDocument::create($script, Action::OP_TYPE_UPDATE);
$bulk->addAction($action);

$response = $bulk->send();

$this->assertTrue($response->isOk());

$index->refresh();

$doc = $index->getDocument($id);

$this->assertEquals($field, $doc->getData()['field']);

$this->assertCount(1, $doc->getData()['sub_field']);
$this->assertCount(1, $doc->getData()['sub_field_2']);

$this->assertEquals($subField['field'], $doc->getData()['sub_field'][0]['field']);
$this->assertEquals($subField2['field_2'], $doc->getData()['sub_field_2'][0]['field_2']);
}

/**
* @group unit
*/
Expand Down

0 comments on commit e479379

Please sign in to comment.