Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Elasticsearch: Fix multiple indexes with no matching #2764

Merged
merged 1 commit into from
Oct 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ private[elasticsearch] case class ScrollResponse[T](error: Option[String], resul
* INTERNAL API
*/
@InternalApi
private[elasticsearch] case class ScrollResult[T](scrollId: String, messages: Seq[ReadResult[T]])
private[elasticsearch] case class ScrollResult[T](scrollId: Option[String], messages: Seq[ReadResult[T]])

/**
* INTERNAL API
Expand Down Expand Up @@ -80,7 +80,7 @@ private[elasticsearch] final class ElasticsearchSourceLogic[T](
with OutHandler
with StageLogging {

private var scrollId: String = null
private var scrollId: Option[String] = None
private val responseHandler = getAsyncCallback[String](handleResponse)
private val failureHandler = getAsyncCallback[Throwable](handleFailure)

Expand All @@ -97,116 +97,120 @@ private[elasticsearch] final class ElasticsearchSourceLogic[T](
try {
waitingForElasticData = true

if (scrollId == null) {
log.debug("Doing initial search")
scrollId match {
case None => {
log.debug("Doing initial search")

// Add extra params to search
val extraParams = Seq(
if (!searchParams.contains("size")) {
Some(("size" -> settings.bufferSize.toString))
} else {
None
},
// Tell elastic to return the documents '_version'-property with the search-results
// http://nocf-www.elastic.co/guide/en/elasticsearch/reference/current/search-request-version.html
// https://www.elastic.co/guide/en/elasticsearch/guide/current/optimistic-concurrency-control.html
if (!searchParams.contains("version") && settings.includeDocumentVersion) {
Some(("version" -> "true"))
} else {
None
}
)

// Add extra params to search
val extraParams = Seq(
if (!searchParams.contains("size")) {
Some(("size" -> settings.bufferSize.toString))
} else {
val baseMap = Map("scroll" -> settings.scroll)

// only force sorting by _doc (meaning order is not known) if not specified in search params
val sortQueryParam = if (searchParams.contains("sort")) {
None
},
// Tell elastic to return the documents '_version'-property with the search-results
// http://nocf-www.elastic.co/guide/en/elasticsearch/reference/current/search-request-version.html
// https://www.elastic.co/guide/en/elasticsearch/guide/current/optimistic-concurrency-control.html
if (!searchParams.contains("version") && settings.includeDocumentVersion) {
Some(("version" -> "true"))
} else {
None
Some(("sort", "_doc"))
}
)

val baseMap = Map("scroll" -> settings.scroll)
val routingQueryParam = searchParams.get("routing").map(r => ("routing", r))

// only force sorting by _doc (meaning order is not known) if not specified in search params
val sortQueryParam = if (searchParams.contains("sort")) {
None
} else {
Some(("sort", "_doc"))
}
val queryParams = baseMap ++ routingQueryParam ++ sortQueryParam
val completeParams = searchParams ++ extraParams.flatten - "routing"

val routingQueryParam = searchParams.get("routing").map(r => ("routing", r))
val searchBody = "{" + completeParams
.map {
case (name, json) =>
"\"" + name + "\":" + json
}
.mkString(",") + "}"

val queryParams = baseMap ++ routingQueryParam ++ sortQueryParam
val completeParams = searchParams ++ extraParams.flatten - "routing"
val endpoint: String = settings.apiVersion match {
case ApiVersion.V5 => s"/${elasticsearchParams.indexName}/${elasticsearchParams.typeName.get}/_search"
case ApiVersion.V7 => s"/${elasticsearchParams.indexName}/_search"
}

val searchBody = "{" + completeParams
.map {
case (name, json) =>
"\"" + name + "\":" + json
val uri = prepareUri(Path(endpoint))
.withQuery(Uri.Query(queryParams))

val request = HttpRequest(HttpMethods.POST)
.withUri(uri)
.withEntity(
HttpEntity(ContentTypes.`application/json`, searchBody)
)
.withHeaders(settings.connection.headers)

ElasticsearchApi
.executeRequest(
request,
settings.connection
)
.flatMap {
case HttpResponse(StatusCodes.OK, _, responseEntity, _) =>
Unmarshal(responseEntity)
.to[String]
.map(json => responseHandler.invoke(json))
case HttpResponse(status, _, responseEntity, _) =>
Unmarshal(responseEntity).to[String].map { body =>
failureHandler
.invoke(new RuntimeException(s"Request failed for POST $uri, got $status with body: $body"))
}
}
.recover {
case cause: Throwable => failureHandler.invoke(cause)
}
.mkString(",") + "}"

val endpoint: String = settings.apiVersion match {
case ApiVersion.V5 => s"/${elasticsearchParams.indexName}/${elasticsearchParams.typeName.get}/_search"
case ApiVersion.V7 => s"/${elasticsearchParams.indexName}/_search"
}

val uri = prepareUri(Path(endpoint))
.withQuery(Uri.Query(queryParams))

val request = HttpRequest(HttpMethods.POST)
.withUri(uri)
.withEntity(
HttpEntity(ContentTypes.`application/json`, searchBody)
)
.withHeaders(settings.connection.headers)

ElasticsearchApi
.executeRequest(
request,
settings.connection
)
.flatMap {
case HttpResponse(StatusCodes.OK, _, responseEntity, _) =>
Unmarshal(responseEntity)
.to[String]
.map(json => responseHandler.invoke(json))
case HttpResponse(status, _, responseEntity, _) =>
Unmarshal(responseEntity).to[String].map { body =>
failureHandler
.invoke(new RuntimeException(s"Request failed for POST $uri, got $status with body: $body"))
}
}
.recover {
case cause: Throwable => failureHandler.invoke(cause)
}
} else {
log.debug("Fetching next scroll")

val uri = prepareUri(Path("/_search/scroll"))

val request = HttpRequest(HttpMethods.POST)
.withUri(uri)
.withEntity(
HttpEntity(ContentTypes.`application/json`,
Map("scroll" -> settings.scroll, "scroll_id" -> scrollId).toJson.compactPrint)
)
.withHeaders(settings.connection.headers)

ElasticsearchApi
.executeRequest(
request,
settings.connection
)
.flatMap {
case HttpResponse(StatusCodes.OK, _, responseEntity, _) =>
Unmarshal(responseEntity)
.to[String]
.map(json => responseHandler.invoke(json))
case HttpResponse(status, _, responseEntity, _) =>
Unmarshal(responseEntity)
.to[String]
.map { body =>
failureHandler.invoke(
new RuntimeException(s"Request failed for POST $uri, got $status with body: $body")
)
}
}
.recover {
case cause: Throwable => failureHandler.invoke(cause)
}
case Some(actualScrollId) => {
log.debug("Fetching next scroll")

val uri = prepareUri(Path("/_search/scroll"))

val request = HttpRequest(HttpMethods.POST)
.withUri(uri)
.withEntity(
HttpEntity(ContentTypes.`application/json`,
Map("scroll" -> settings.scroll, "scroll_id" -> actualScrollId).toJson.compactPrint)
)
.withHeaders(settings.connection.headers)

ElasticsearchApi
.executeRequest(
request,
settings.connection
)
.flatMap {
case HttpResponse(StatusCodes.OK, _, responseEntity, _) =>
Unmarshal(responseEntity)
.to[String]
.map(json => responseHandler.invoke(json))
case HttpResponse(status, _, responseEntity, _) =>
Unmarshal(responseEntity)
.to[String]
.map { body =>
failureHandler.invoke(
new RuntimeException(s"Request failed for POST $uri, got $status with body: $body")
)
}
}
.recover {
case cause: Throwable => failureHandler.invoke(cause)
}
}
}
} catch {
case ex: Exception => failureHandler.invoke(ex)
Expand Down Expand Up @@ -295,40 +299,42 @@ private[elasticsearch] final class ElasticsearchSourceLogic[T](
}

/**
* If the [[scrollId]] is non null, attempt to clear the scroll.
* If the [[scrollId]] is defined, attempt to clear the scroll.
* Complete the stage successfully, whether or not the clear call succeeds.
* If the clear call fails, the scroll will eventually timeout.
*/
def clearScrollAsync(): Unit = {
if (scrollId == null) {
log.debug("Scroll Id is null. Completing stage eagerly.")
completeStage()
} else {
// Clear the scroll
val uri = prepareUri(Path(s"/_search/scroll/$scrollId"))

val request = HttpRequest(HttpMethods.DELETE)
.withUri(uri)
.withHeaders(settings.connection.headers)

ElasticsearchApi
.executeRequest(request, settings.connection)
.flatMap {
case HttpResponse(StatusCodes.OK, _, responseEntity, _) =>
Unmarshal(responseEntity)
.to[String]
.map(json => {
clearScrollAsyncHandler.invoke(Success(json))
})
case HttpResponse(status, _, responseEntity, _) =>
Unmarshal(responseEntity).to[String].map { body =>
clearScrollAsyncHandler
.invoke(Failure(new RuntimeException(s"Request failed for POST $uri, got $status with body: $body")))
}
}
.recover {
case cause: Throwable => failureHandler.invoke(cause)
}
scrollId match {
case None =>
log.debug("Scroll Id is empty. Completing stage eagerly.")
completeStage()
case Some(actualScrollId) => {
// Clear the scroll
val uri = prepareUri(Path(s"/_search/scroll/$actualScrollId"))

val request = HttpRequest(HttpMethods.DELETE)
.withUri(uri)
.withHeaders(settings.connection.headers)

ElasticsearchApi
.executeRequest(request, settings.connection)
.flatMap {
case HttpResponse(StatusCodes.OK, _, responseEntity, _) =>
Unmarshal(responseEntity)
.to[String]
.map(json => {
clearScrollAsyncHandler.invoke(Success(json))
})
case HttpResponse(status, _, responseEntity, _) =>
Unmarshal(responseEntity).to[String].map { body =>
clearScrollAsyncHandler
.invoke(Failure(new RuntimeException(s"Request failed for POST $uri, got $status with body: $body")))
}
}
.recover {
case cause: Throwable => failureHandler.invoke(cause)
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ object ElasticsearchSource {
if (jsonTree.has("error")) {
impl.ScrollResponse(Some(jsonTree.get("error").asText()), None)
} else {
val scrollId = jsonTree.get("_scroll_id").asText()
val scrollId = Option(jsonTree.get("_scroll_id")).map(_.asText())
val hits = jsonTree.get("hits").get("hits").asInstanceOf[ArrayNode]
val messages = hits.elements().asScala.toList.map { element =>
val id = element.get("_id").asText()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ object ElasticsearchSource {
impl.ScrollResponse(Some(error.toString), None)
}
case None => {
val scrollId = jsObj.fields("_scroll_id").asInstanceOf[JsString].value
val scrollId = jsObj.fields.get("_scroll_id").map(v => v.asInstanceOf[JsString].value)
val hits = jsObj.fields("hits").asJsObject.fields("hits").asInstanceOf[JsArray]
val messages = hits.elements.map { element =>
val doc = element.asJsObject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

@RunWith(value = Parameterized.class)
public class ElasticsearchParameterizedTest extends ElasticsearchTestBase {
Expand Down Expand Up @@ -193,6 +195,25 @@ public void testUsingVersionType() throws Exception {
assertEquals(externalVersion, message.version().get());
}

@Test
public void testMultipleIndicesWithNoMatching() throws Exception {
String indexName = "missing-*";
String typeName = "_doc";

// Assert that the document's external version is saved
List<ReadResult<Book>> readResults =
ElasticsearchSource.<Book>typed(
constructElasticsearchParams(indexName, typeName, apiVersion),
"{\"match_all\": {}}",
ElasticsearchSourceSettings.create(connectionSettings).withApiVersion(apiVersion),
Book.class)
.runWith(Sink.seq(), system)
.toCompletableFuture()
.get();

assertTrue(readResults.isEmpty());
}

public void compileOnlySample() {
String doc = "dummy-doc";

Expand Down
Loading