Skip to content

Commit

Permalink
Elasticsearch: keep path from base URI (#3042)
Browse files Browse the repository at this point in the history
* build endpoint path with DSL
* conditional slash
  • Loading branch information
ennru authored Dec 1, 2023
1 parent 497cc50 commit 2d8b404
Showing 1 changed file with 4 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package akka.stream.alpakka.elasticsearch.impl

import akka.annotation.InternalApi
import akka.http.scaladsl.HttpExt
import akka.http.scaladsl.model.Uri.Path
import akka.http.scaladsl.model._
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.alpakka.elasticsearch._
Expand Down Expand Up @@ -71,15 +70,17 @@ private[elasticsearch] final class ElasticsearchSimpleFlowStage[T, C](
override def onPull(): Unit = tryPull()

override def onPush(): Unit = {
val endpoint = if (settings.allowExplicitIndex) "/_bulk" else s"/${elasticsearchParams.indexName}/_bulk"
val endpoint =
if (settings.allowExplicitIndex) baseUri.path ?/ "_bulk"
else baseUri.path ?/ elasticsearchParams.indexName / "_bulk"
val (messages, resultsPassthrough) = grab(in)
inflight = true
val json: String = restApi.toJson(messages)

log.debug("Posting data to Elasticsearch: {}", json)

if (json.nonEmpty) {
val uri = baseUri.withPath(Path(endpoint))
val uri = baseUri.withPath(endpoint)
val request = HttpRequest(HttpMethods.POST)
.withUri(uri)
.withEntity(HttpEntity(NDJsonProtocol.`application/x-ndjson`, json))
Expand Down

0 comments on commit 2d8b404

Please sign in to comment.